| Index: appengine/monorail/services/issue_svc.py
|
| diff --git a/appengine/monorail/services/issue_svc.py b/appengine/monorail/services/issue_svc.py
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..6a98c459c3472a2978ecb199c8307ddc019bab3e
|
| --- /dev/null
|
| +++ b/appengine/monorail/services/issue_svc.py
|
| @@ -0,0 +1,2557 @@
|
| +# Copyright 2016 The Chromium Authors. All rights reserved.
|
| +# Use of this source code is govered by a BSD-style
|
| +# license that can be found in the LICENSE file or at
|
| +# https://developers.google.com/open-source/licenses/bsd
|
| +
|
| +"""A set of functions that provide persistence for Monorail issue tracking.
|
| +
|
| +This module provides functions to get, update, create, and (in some
|
| +cases) delete each type of business object. It provides a logical
|
| +persistence layer on top of an SQL database.
|
| +
|
| +Business objects are described in tracker_pb2.py and tracker_bizobj.py.
|
| +"""
|
| +
|
| +import collections
|
| +import json
|
| +import logging
|
| +import os
|
| +import time
|
| +import uuid
|
| +
|
| +from google.appengine.api import app_identity
|
| +from google.appengine.api import images
|
| +from third_party import cloudstorage
|
| +
|
| +import settings
|
| +from features import filterrules_helpers
|
| +from framework import framework_bizobj
|
| +from framework import framework_constants
|
| +from framework import framework_helpers
|
| +from framework import gcs_helpers
|
| +from framework import permissions
|
| +from framework import sql
|
| +from infra_libs import ts_mon
|
| +from proto import project_pb2
|
| +from proto import tracker_pb2
|
| +from services import caches
|
| +from services import tracker_fulltext
|
| +from tracker import tracker_bizobj
|
| +from tracker import tracker_helpers
|
| +
|
| +
|
| +ISSUE_TABLE_NAME = 'Issue'
|
| +ISSUESUMMARY_TABLE_NAME = 'IssueSummary'
|
| +ISSUE2LABEL_TABLE_NAME = 'Issue2Label'
|
| +ISSUE2COMPONENT_TABLE_NAME = 'Issue2Component'
|
| +ISSUE2CC_TABLE_NAME = 'Issue2Cc'
|
| +ISSUE2NOTIFY_TABLE_NAME = 'Issue2Notify'
|
| +ISSUE2FIELDVALUE_TABLE_NAME = 'Issue2FieldValue'
|
| +COMMENT_TABLE_NAME = 'Comment'
|
| +ATTACHMENT_TABLE_NAME = 'Attachment'
|
| +ISSUERELATION_TABLE_NAME = 'IssueRelation'
|
| +DANGLINGRELATION_TABLE_NAME = 'DanglingIssueRelation'
|
| +ISSUEUPDATE_TABLE_NAME = 'IssueUpdate'
|
| +ISSUEFORMERLOCATIONS_TABLE_NAME = 'IssueFormerLocations'
|
| +REINDEXQUEUE_TABLE_NAME = 'ReindexQueue'
|
| +LOCALIDCOUNTER_TABLE_NAME = 'LocalIDCounter'
|
| +
|
| +ISSUE_COLS = [
|
| + 'id', 'project_id', 'local_id', 'status_id', 'owner_id', 'reporter_id',
|
| + 'opened', 'closed', 'modified', 'derived_owner_id', 'derived_status_id',
|
| + 'deleted', 'star_count', 'attachment_count', 'is_spam']
|
| +ISSUESUMMARY_COLS = ['issue_id', 'summary']
|
| +ISSUE2LABEL_COLS = ['issue_id', 'label_id', 'derived']
|
| +ISSUE2COMPONENT_COLS = ['issue_id', 'component_id', 'derived']
|
| +ISSUE2CC_COLS = ['issue_id', 'cc_id', 'derived']
|
| +ISSUE2NOTIFY_COLS = ['issue_id', 'email']
|
| +ISSUE2FIELDVALUE_COLS = [
|
| + 'issue_id', 'field_id', 'int_value', 'str_value', 'user_id', 'derived']
|
| +COMMENT_COLS = [
|
| + 'Comment.id', 'issue_id', 'created', 'Comment.project_id', 'commenter_id',
|
| + 'content', 'inbound_message', 'was_escaped', 'deleted_by',
|
| + 'Comment.is_spam']
|
| +ABBR_COMMENT_COLS = ['Comment.id', 'commenter_id', 'deleted_by']
|
| +ATTACHMENT_COLS = [
|
| + 'id', 'issue_id', 'comment_id', 'filename', 'filesize', 'mimetype',
|
| + 'deleted', 'gcs_object_id']
|
| +ISSUERELATION_COLS = ['issue_id', 'dst_issue_id', 'kind']
|
| +DANGLINGRELATION_COLS = [
|
| + 'issue_id', 'dst_issue_project', 'dst_issue_local_id', 'kind']
|
| +ISSUEUPDATE_COLS = [
|
| + 'id', 'issue_id', 'comment_id', 'field', 'old_value', 'new_value',
|
| + 'added_user_id', 'removed_user_id', 'custom_field_name']
|
| +ISSUEFORMERLOCATIONS_COLS = ['issue_id', 'project_id', 'local_id']
|
| +REINDEXQUEUE_COLS = ['issue_id', 'created']
|
| +
|
| +CHUNK_SIZE = 1000
|
| +
|
| +
|
| +class IssueIDTwoLevelCache(caches.AbstractTwoLevelCache):
|
| + """Class to manage RAM and memcache for Issue IDs."""
|
| +
|
| + def __init__(self, cache_manager, issue_service):
|
| + super(IssueIDTwoLevelCache, self).__init__(
|
| + cache_manager, 'issue_id', 'issue_id:', int,
|
| + max_size=settings.issue_cache_max_size, use_value_centric_cache=True)
|
| + self.issue_service = issue_service
|
| +
|
| + def _DeserializeIssueIDs(self, project_local_issue_ids):
|
| + """Convert database rows into a dict {(project_id, local_id): issue_id}."""
|
| + return {(project_id, local_id): issue_id
|
| + for (project_id, local_id, issue_id) in project_local_issue_ids}
|
| +
|
| + def FetchItems(self, cnxn, keys):
|
| + """On RAM and memcache miss, hit the database."""
|
| + local_ids_by_pid = collections.defaultdict(list)
|
| + for project_id, local_id in keys:
|
| + local_ids_by_pid[project_id].append(local_id)
|
| +
|
| + where = [] # We OR per-project pairs of conditions together.
|
| + for project_id, local_ids_in_project in local_ids_by_pid.iteritems():
|
| + term_str = ('(Issue.project_id = %%s AND Issue.local_id IN (%s))' %
|
| + sql.PlaceHolders(local_ids_in_project))
|
| + where.append((term_str, [project_id] + local_ids_in_project))
|
| +
|
| + rows = self.issue_service.issue_tbl.Select(
|
| + cnxn, cols=['project_id', 'local_id', 'id'],
|
| + where=where, or_where_conds=True)
|
| + return self._DeserializeIssueIDs(rows)
|
| +
|
| + def _KeyToStr(self, key):
|
| + """This cache uses pairs of ints as keys. Convert them to strings."""
|
| + return '%d,%d' % key
|
| +
|
| + def _StrToKey(self, key_str):
|
| + """This cache uses pairs of ints as keys. Convert them from strings."""
|
| + project_id_str, local_id_str = key_str.split(',')
|
| + return int(project_id_str), int(local_id_str)
|
| +
|
| +
|
| +class IssueTwoLevelCache(caches.AbstractTwoLevelCache):
|
| + """Class to manage RAM and memcache for Issue PBs."""
|
| +
|
| + def __init__(
|
| + self, cache_manager, issue_service, project_service, config_service):
|
| + super(IssueTwoLevelCache, self).__init__(
|
| + cache_manager, 'issue', 'issue:', tracker_pb2.Issue,
|
| + max_size=settings.issue_cache_max_size)
|
| + self.issue_service = issue_service
|
| + self.project_service = project_service
|
| + self.config_service = config_service
|
| +
|
| + def _UnpackIssue(self, cnxn, issue_row):
|
| + """Partially construct an issue object using info from a DB row."""
|
| + (issue_id, project_id, local_id, status_id, owner_id, reporter_id,
|
| + opened, closed, modified, derived_owner_id, derived_status_id,
|
| + deleted, star_count, attachment_count, is_spam) = issue_row
|
| +
|
| + issue = tracker_pb2.Issue()
|
| + project = self.project_service.GetProject(cnxn, project_id)
|
| + issue.project_name = project.project_name
|
| + issue.issue_id = issue_id
|
| + issue.project_id = project_id
|
| + issue.local_id = local_id
|
| + if status_id is not None:
|
| + status = self.config_service.LookupStatus(cnxn, project_id, status_id)
|
| + issue.status = status
|
| + issue.owner_id = owner_id or 0
|
| + issue.reporter_id = reporter_id or 0
|
| + issue.derived_owner_id = derived_owner_id or 0
|
| + if derived_status_id is not None:
|
| + derived_status = self.config_service.LookupStatus(
|
| + cnxn, project_id, derived_status_id)
|
| + issue.derived_status = derived_status
|
| + issue.deleted = bool(deleted)
|
| + if opened:
|
| + issue.opened_timestamp = opened
|
| + if closed:
|
| + issue.closed_timestamp = closed
|
| + if modified:
|
| + issue.modified_timestamp = modified
|
| + issue.star_count = star_count
|
| + issue.attachment_count = attachment_count
|
| + issue.is_spam = bool(is_spam)
|
| + return issue
|
| +
|
| + def _UnpackFieldValue(self, fv_row):
|
| + """Construct a field value object from a DB row."""
|
| + (issue_id, field_id, int_value, str_value, user_id, derived) = fv_row
|
| + fv = tracker_bizobj.MakeFieldValue(
|
| + field_id, int_value, str_value, user_id, bool(derived))
|
| + return fv, issue_id
|
| +
|
| + def _DeserializeIssues(
|
| + self, cnxn, issue_rows, summary_rows, label_rows, component_rows,
|
| + cc_rows, notify_rows, fieldvalue_rows, relation_rows,
|
| + dangling_relation_rows):
|
| + """Convert the given DB rows into a dict of Issue PBs."""
|
| + results_dict = {}
|
| + for issue_row in issue_rows:
|
| + issue = self._UnpackIssue(cnxn, issue_row)
|
| + results_dict[issue.issue_id] = issue
|
| +
|
| + for issue_id, summary in summary_rows:
|
| + results_dict[issue_id].summary = summary
|
| +
|
| + # TODO(jrobbins): it would be nice to order labels by rank and name.
|
| + for issue_id, label_id, derived in label_rows:
|
| + issue = results_dict.get(issue_id)
|
| + if not issue:
|
| + logging.info('Got label for an unknown issue: %r %r',
|
| + label_rows, issue_rows)
|
| + continue
|
| + label = self.config_service.LookupLabel(cnxn, issue.project_id, label_id)
|
| + assert label, ('Label ID %r on IID %r not found in project %r' %
|
| + (label_id, issue_id, issue.project_id))
|
| + if derived:
|
| + results_dict[issue_id].derived_labels.append(label)
|
| + else:
|
| + results_dict[issue_id].labels.append(label)
|
| +
|
| + for issue_id, component_id, derived in component_rows:
|
| + if derived:
|
| + results_dict[issue_id].derived_component_ids.append(component_id)
|
| + else:
|
| + results_dict[issue_id].component_ids.append(component_id)
|
| +
|
| + for issue_id, user_id, derived in cc_rows:
|
| + if derived:
|
| + results_dict[issue_id].derived_cc_ids.append(user_id)
|
| + else:
|
| + results_dict[issue_id].cc_ids.append(user_id)
|
| +
|
| + for issue_id, email in notify_rows:
|
| + results_dict[issue_id].derived_notify_addrs.append(email)
|
| +
|
| + for fv_row in fieldvalue_rows:
|
| + fv, issue_id = self._UnpackFieldValue(fv_row)
|
| + results_dict[issue_id].field_values.append(fv)
|
| +
|
| + for issue_id, dst_issue_id, kind in relation_rows:
|
| + src_issue = results_dict.get(issue_id)
|
| + dst_issue = results_dict.get(dst_issue_id)
|
| + assert src_issue or dst_issue, (
|
| + 'Neither source issue %r nor dest issue %r was found' %
|
| + (issue_id, dst_issue_id))
|
| + if src_issue:
|
| + if kind == 'blockedon':
|
| + src_issue.blocked_on_iids.append(dst_issue_id)
|
| + elif kind == 'mergedinto':
|
| + src_issue.merged_into = dst_issue_id
|
| + else:
|
| + logging.info('unknown relation kind %r', kind)
|
| + continue
|
| +
|
| + if dst_issue:
|
| + if kind == 'blockedon':
|
| + dst_issue.blocking_iids.append(issue_id)
|
| +
|
| + for issue_id, dst_issue_proj, dst_issue_id, kind in dangling_relation_rows:
|
| + src_issue = results_dict.get(issue_id)
|
| + if kind == 'blockedon':
|
| + src_issue.dangling_blocked_on_refs.append(
|
| + tracker_bizobj.MakeDanglingIssueRef(dst_issue_proj, dst_issue_id))
|
| + elif kind == 'blocking':
|
| + src_issue.dangling_blocking_refs.append(
|
| + tracker_bizobj.MakeDanglingIssueRef(dst_issue_proj, dst_issue_id))
|
| + else:
|
| + logging.warn('unhandled danging relation kind %r', kind)
|
| + continue
|
| +
|
| + return results_dict
|
| +
|
| + # Note: sharding is used to here to allow us to load issues from the replicas
|
| + # without placing load on the master. Writes are not sharded.
|
| + # pylint: disable=arguments-differ
|
| + def FetchItems(self, cnxn, issue_ids, shard_id=None):
|
| + """Retrieve and deserialize issues."""
|
| + issue_rows = self.issue_service.issue_tbl.Select(
|
| + cnxn, cols=ISSUE_COLS, id=issue_ids, shard_id=shard_id)
|
| +
|
| + summary_rows = self.issue_service.issuesummary_tbl.Select(
|
| + cnxn, cols=ISSUESUMMARY_COLS, shard_id=shard_id, issue_id=issue_ids)
|
| + label_rows = self.issue_service.issue2label_tbl.Select(
|
| + cnxn, cols=ISSUE2LABEL_COLS, shard_id=shard_id, issue_id=issue_ids)
|
| + component_rows = self.issue_service.issue2component_tbl.Select(
|
| + cnxn, cols=ISSUE2COMPONENT_COLS, shard_id=shard_id, issue_id=issue_ids)
|
| + cc_rows = self.issue_service.issue2cc_tbl.Select(
|
| + cnxn, cols=ISSUE2CC_COLS, shard_id=shard_id, issue_id=issue_ids)
|
| + notify_rows = self.issue_service.issue2notify_tbl.Select(
|
| + cnxn, cols=ISSUE2NOTIFY_COLS, shard_id=shard_id, issue_id=issue_ids)
|
| + fieldvalue_rows = self.issue_service.issue2fieldvalue_tbl.Select(
|
| + cnxn, cols=ISSUE2FIELDVALUE_COLS, shard_id=shard_id,
|
| + issue_id=issue_ids)
|
| + if issue_ids:
|
| + ph = sql.PlaceHolders(issue_ids)
|
| + relation_rows = self.issue_service.issuerelation_tbl.Select(
|
| + cnxn, cols=ISSUERELATION_COLS,
|
| + where=[('(issue_id IN (%s) OR dst_issue_id IN (%s))' % (ph, ph),
|
| + issue_ids + issue_ids)])
|
| + dangling_relation_rows = self.issue_service.danglingrelation_tbl.Select(
|
| + cnxn, cols=DANGLINGRELATION_COLS, issue_id=issue_ids)
|
| + else:
|
| + relation_rows = []
|
| + dangling_relation_rows = []
|
| +
|
| + return self._DeserializeIssues(
|
| + cnxn, issue_rows, summary_rows, label_rows, component_rows, cc_rows,
|
| + notify_rows, fieldvalue_rows, relation_rows, dangling_relation_rows)
|
| +
|
| +
|
| +class IssueService(object):
|
| + """The persistence layer for Monorail's issues, comments, and attachments."""
|
| + spam_labels = ts_mon.CounterMetric('monorail/issue_svc/spam_label')
|
| +
|
| + def __init__(self, project_service, config_service, cache_manager):
|
| + """Initialize this object so that it is ready to use.
|
| +
|
| + Args:
|
| + project_service: services object for project info.
|
| + config_service: services object for tracker configuration info.
|
| + cache_manager: local cache with distributed invalidation.
|
| + """
|
| + # Tables that represent issue data.
|
| + self.issue_tbl = sql.SQLTableManager(ISSUE_TABLE_NAME)
|
| + self.issuesummary_tbl = sql.SQLTableManager(ISSUESUMMARY_TABLE_NAME)
|
| + self.issue2label_tbl = sql.SQLTableManager(ISSUE2LABEL_TABLE_NAME)
|
| + self.issue2component_tbl = sql.SQLTableManager(ISSUE2COMPONENT_TABLE_NAME)
|
| + self.issue2cc_tbl = sql.SQLTableManager(ISSUE2CC_TABLE_NAME)
|
| + self.issue2notify_tbl = sql.SQLTableManager(ISSUE2NOTIFY_TABLE_NAME)
|
| + self.issue2fieldvalue_tbl = sql.SQLTableManager(ISSUE2FIELDVALUE_TABLE_NAME)
|
| + self.issuerelation_tbl = sql.SQLTableManager(ISSUERELATION_TABLE_NAME)
|
| + self.danglingrelation_tbl = sql.SQLTableManager(DANGLINGRELATION_TABLE_NAME)
|
| + self.issueformerlocations_tbl = sql.SQLTableManager(
|
| + ISSUEFORMERLOCATIONS_TABLE_NAME)
|
| +
|
| + # Tables that represent comments.
|
| + self.comment_tbl = sql.SQLTableManager(COMMENT_TABLE_NAME)
|
| + self.issueupdate_tbl = sql.SQLTableManager(ISSUEUPDATE_TABLE_NAME)
|
| + self.attachment_tbl = sql.SQLTableManager(ATTACHMENT_TABLE_NAME)
|
| +
|
| + # Tables for cron tasks.
|
| + self.reindexqueue_tbl = sql.SQLTableManager(REINDEXQUEUE_TABLE_NAME)
|
| +
|
| + # Tables for generating sequences of local IDs.
|
| + self.localidcounter_tbl = sql.SQLTableManager(LOCALIDCOUNTER_TABLE_NAME)
|
| +
|
| + # Like a dictionary {(project_id, local_id): issue_id}
|
| + # Use value centric cache here because we cannot store a tuple in the
|
| + # Invalidate table.
|
| + self.issue_id_2lc = IssueIDTwoLevelCache(cache_manager, self)
|
| + # Like a dictionary {issue_id: issue}
|
| + self.issue_2lc = IssueTwoLevelCache(
|
| + cache_manager, self, project_service, config_service)
|
| +
|
| + self._config_service = config_service
|
| +
|
| + ### Issue ID lookups
|
| +
|
| + def LookupIssueIDs(self, cnxn, project_local_id_pairs):
|
| + """Find the global issue IDs given the project ID and local ID of each."""
|
| + issue_id_dict, _misses = self.issue_id_2lc.GetAll(
|
| + cnxn, project_local_id_pairs)
|
| +
|
| + # Put the Issue IDs in the order specified by project_local_id_pairs
|
| + issue_ids = [issue_id_dict[pair] for pair in project_local_id_pairs
|
| + if pair in issue_id_dict]
|
| +
|
| + return issue_ids
|
| +
|
| + def LookupIssueID(self, cnxn, project_id, local_id):
|
| + """Find the global issue ID given the project ID and local ID."""
|
| + issue_ids = self.LookupIssueIDs(cnxn, [(project_id, local_id)])
|
| + try:
|
| + return issue_ids[0]
|
| + except IndexError:
|
| + raise NoSuchIssueException()
|
| +
|
| + def ResolveIssueRefs(
|
| + self, cnxn, ref_projects, default_project_name, refs):
|
| + """Look up all the referenced issues and return their issue_ids.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + ref_projects: pre-fetched dict {project_name: project} of all projects
|
| + mentioned in the refs as well as the default project.
|
| + default_project_name: string name of the current project, this is used
|
| + when the project_name in a ref is None.
|
| + refs: list of (project_name, local_id) pairs. These are parsed from
|
| + textual references in issue descriptions, comments, and the input
|
| + in the blocked-on field.
|
| +
|
| + Returns:
|
| + A list of issue_ids for all the referenced issues. References to issues
|
| + in deleted projects and any issues not found are simply ignored.
|
| + """
|
| + if not refs:
|
| + return []
|
| +
|
| + project_local_id_pairs = []
|
| + for project_name, local_id in refs:
|
| + project = ref_projects.get(project_name or default_project_name)
|
| + if not project or project.state == project_pb2.ProjectState.DELETABLE:
|
| + continue # ignore any refs to issues in deleted projects
|
| + project_local_id_pairs.append((project.project_id, local_id))
|
| +
|
| + issue_ids = self.LookupIssueIDs(cnxn, project_local_id_pairs)
|
| + return issue_ids
|
| +
|
| + ### Issue objects
|
| +
|
| + def CreateIssue(
|
| + self, cnxn, services, project_id, summary, status,
|
| + owner_id, cc_ids, labels, field_values, component_ids, reporter_id,
|
| + marked_description, blocked_on=None, blocking=None, attachments=None,
|
| + timestamp=None, index_now=True):
|
| + """Create and store a new issue with all the given information.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + services: persistence layer for users, issues, and projects.
|
| + project_id: int ID for the current project.
|
| + summary: one-line summary string summarizing this issue.
|
| + status: string issue status value. E.g., 'New'.
|
| + owner_id: user ID of the issue owner.
|
| + cc_ids: list of user IDs for users to be CC'd on changes.
|
| + labels: list of label strings. E.g., 'Priority-High'.
|
| + field_values: list of FieldValue PBs.
|
| + component_ids: list of int component IDs.
|
| + reporter_id: user ID of the user who reported the issue.
|
| + marked_description: issue description with initial HTML markup.
|
| + blocked_on: list of issue_ids that this issue is blocked on.
|
| + blocking: list of issue_ids that this issue blocks.
|
| + attachments: [(filename, contents, mimetype),...] attachments uploaded at
|
| + the time the comment was made.
|
| + timestamp: time that the issue was entered, defaults to now.
|
| + index_now: True if the issue should be updated in the full text index.
|
| +
|
| + Returns:
|
| + The integer local ID of the new issue.
|
| + """
|
| + config = self._config_service.GetProjectConfig(cnxn, project_id)
|
| + iids_to_invalidate = set()
|
| +
|
| + status = framework_bizobj.CanonicalizeLabel(status)
|
| + labels = [framework_bizobj.CanonicalizeLabel(l) for l in labels]
|
| + labels = [l for l in labels if l]
|
| +
|
| + issue = tracker_pb2.Issue()
|
| + issue.project_id = project_id
|
| + issue.summary = summary
|
| + issue.status = status
|
| + issue.owner_id = owner_id
|
| + issue.cc_ids.extend(cc_ids)
|
| + issue.labels.extend(labels)
|
| + issue.field_values.extend(field_values)
|
| + issue.component_ids.extend(component_ids)
|
| + issue.reporter_id = reporter_id
|
| + if blocked_on is not None:
|
| + iids_to_invalidate.update(blocked_on)
|
| + issue.blocked_on_iids = blocked_on
|
| + if blocking is not None:
|
| + iids_to_invalidate.update(blocking)
|
| + issue.blocking_iids = blocking
|
| + if attachments:
|
| + issue.attachment_count = len(attachments)
|
| + timestamp = timestamp or int(time.time())
|
| + issue.opened_timestamp = timestamp
|
| + issue.modified_timestamp = timestamp
|
| +
|
| + comment = self._MakeIssueComment(
|
| + project_id, reporter_id, marked_description,
|
| + attachments=attachments, timestamp=timestamp, was_escaped=True)
|
| +
|
| + # Set the closed_timestamp both before and after filter rules.
|
| + if not tracker_helpers.MeansOpenInProject(
|
| + tracker_bizobj.GetStatus(issue), config):
|
| + issue.closed_timestamp = timestamp
|
| + filterrules_helpers.ApplyFilterRules(cnxn, services, issue, config)
|
| + if not tracker_helpers.MeansOpenInProject(
|
| + tracker_bizobj.GetStatus(issue), config):
|
| + issue.closed_timestamp = timestamp
|
| +
|
| + classification = services.spam.ClassifyIssue(issue, comment)
|
| +
|
| + label = classification['outputLabel']
|
| + logging.info('issue/comment classification: %s' % classification)
|
| + score = 0
|
| + for output in classification['outputMulti']:
|
| + if output['label'] == label:
|
| + score = float(output['score'])
|
| +
|
| + self.spam_labels.increment({'type': label})
|
| +
|
| + if label == 'spam' and score > settings.classifier_spam_thresh:
|
| + # Must be negative so as not to use up actual local_ids.
|
| + # This can be fixed later if a human declares it to be ham.
|
| + issue.local_id = self.AllocateNextSpamLocalID(cnxn, project_id)
|
| + issue.is_spam = True
|
| + else:
|
| + issue.local_id = self.AllocateNextLocalID(cnxn, project_id)
|
| +
|
| + issue_id = self.InsertIssue(cnxn, issue)
|
| + comment.issue_id = issue_id
|
| + self.InsertComment(cnxn, comment)
|
| +
|
| + issue.issue_id = issue_id
|
| + services.spam.RecordClassifierIssueVerdict(
|
| + cnxn, issue, label=='spam', score)
|
| +
|
| + if permissions.HasRestrictions(issue, 'view'):
|
| + self._config_service.InvalidateMemcache(
|
| + [issue], key_prefix='nonviewable:')
|
| +
|
| + # Add a comment to existing issues saying they are now blocking or
|
| + # blocked on this issue.
|
| + blocked_add_issues = self.GetIssues(cnxn, blocked_on or [])
|
| + for add_issue in blocked_add_issues:
|
| + self.CreateIssueComment(
|
| + cnxn, add_issue.project_id, add_issue.local_id, reporter_id,
|
| + content='',
|
| + amendments=[tracker_bizobj.MakeBlockingAmendment(
|
| + [(issue.project_name, issue.local_id)], [],
|
| + default_project_name=add_issue.project_name)])
|
| + blocking_add_issues = self.GetIssues(cnxn, blocking or [])
|
| + for add_issue in blocking_add_issues:
|
| + self.CreateIssueComment(
|
| + cnxn, add_issue.project_id, add_issue.local_id, reporter_id,
|
| + content='',
|
| + amendments=[tracker_bizobj.MakeBlockedOnAmendment(
|
| + [(issue.project_name, issue.local_id)], [],
|
| + default_project_name=add_issue.project_name)])
|
| +
|
| + self._UpdateIssuesModified(
|
| + cnxn, iids_to_invalidate, modified_timestamp=timestamp)
|
| +
|
| + if index_now:
|
| + tracker_fulltext.IndexIssues(
|
| + cnxn, [issue], services.user, self, self._config_service)
|
| +
|
| + return issue.local_id
|
| +
|
| + def AllocateNewLocalIDs(self, cnxn, issues):
|
| + # Filter to just the issues that need new local IDs.
|
| + issues = [issue for issue in issues if issue.local_id < 0]
|
| +
|
| + for issue in issues:
|
| + if issue.local_id < 0:
|
| + issue.local_id = self.AllocateNextLocalID(cnxn, issue.project_id)
|
| +
|
| + self.UpdateIssues(cnxn, issues)
|
| +
|
| + logging.info("AllocateNewLocalIDs")
|
| +
|
| + def GetAllIssuesInProject(self, cnxn, project_id, min_local_id=None):
|
| + """Special query to efficiently get ALL issues in a project.
|
| +
|
| + This is not done while the user is waiting, only by backround tasks.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + project_id: the ID of the project.
|
| + min_local_id: optional int to start at.
|
| +
|
| + Returns:
|
| + A list of Issue protocol buffers for all issues.
|
| + """
|
| + all_local_ids = self.GetAllLocalIDsInProject(
|
| + cnxn, project_id, min_local_id=min_local_id)
|
| + return self.GetIssuesByLocalIDs(cnxn, project_id, all_local_ids)
|
| +
|
| + def GetAnyOnHandIssue(self, issue_ids, start=None, end=None):
|
| + """Get any one issue from RAM or memcache, otherwise return None."""
|
| + return self.issue_2lc.GetAnyOnHandItem(issue_ids, start=start, end=end)
|
| +
|
| + def GetIssuesDict(self, cnxn, issue_ids, use_cache=True, shard_id=None):
|
| + """Get a dict {iid: issue} from the DB or cache."""
|
| + issue_dict, _missed_iids = self.issue_2lc.GetAll(
|
| + cnxn, issue_ids, use_cache=use_cache, shard_id=shard_id)
|
| + return issue_dict
|
| +
|
| + def GetIssues(self, cnxn, issue_ids, use_cache=True, shard_id=None):
|
| + """Get a list of Issue PBs from the DB or cache.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + issue_ids: integer global issue IDs of the issues.
|
| + use_cache: optional boolean to turn off using the cache.
|
| + shard_id: optional int shard_id to limit retrieval.
|
| +
|
| + Returns:
|
| + A list of Issue PBs in the same order as the given issue_ids.
|
| + """
|
| + issue_dict = self.GetIssuesDict(
|
| + cnxn, issue_ids, use_cache=use_cache, shard_id=shard_id)
|
| +
|
| + # Return a list that is ordered the same as the given issue_ids.
|
| + issue_list = [issue_dict[issue_id] for issue_id in issue_ids
|
| + if issue_id in issue_dict]
|
| +
|
| + return issue_list
|
| +
|
| + def GetIssue(self, cnxn, issue_id):
|
| + """Get one Issue PB from the DB.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + issue_id: integer global issue ID of the issue.
|
| +
|
| + Returns:
|
| + The requested Issue protocol buffer.
|
| +
|
| + Raises:
|
| + NoSuchIssueException: the issue was not found.
|
| + """
|
| + issues = self.GetIssues(cnxn, [issue_id])
|
| + try:
|
| + return issues[0]
|
| + except IndexError:
|
| + raise NoSuchIssueException()
|
| +
|
| + def GetIssuesByLocalIDs(
|
| + self, cnxn, project_id, local_id_list, shard_id=None):
|
| + """Get all the requested issues.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + project_id: int ID of the project to which the issues belong.
|
| + local_id_list: list of integer local IDs for the requested issues.
|
| + shard_id: optional int shard_id to choose a replica.
|
| +
|
| + Returns:
|
| + List of Issue PBs for the requested issues. The result Issues
|
| + will be ordered in the same order as local_id_list.
|
| + """
|
| + issue_ids_to_fetch = self.LookupIssueIDs(
|
| + cnxn, [(project_id, local_id) for local_id in local_id_list])
|
| + issues = self.GetIssues(cnxn, issue_ids_to_fetch, shard_id=shard_id)
|
| + return issues
|
| +
|
| + def GetIssueByLocalID(self, cnxn, project_id, local_id):
|
| + """Get one Issue PB from the DB.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + project_id: the ID of the project to which the issue belongs.
|
| + local_id: integer local ID of the issue.
|
| +
|
| + Returns:
|
| + The requested Issue protocol buffer.
|
| + """
|
| + issues = self.GetIssuesByLocalIDs(cnxn, project_id, [local_id])
|
| + try:
|
| + return issues[0]
|
| + except IndexError:
|
| + raise NoSuchIssueException('The issue %s:%d does not exist.' % (
|
| + project_id, local_id))
|
| +
|
| + def GetOpenAndClosedIssues(self, cnxn, issue_ids):
|
| + """Return the requested issues in separate open and closed lists.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + issue_ids: list of int issue issue_ids.
|
| +
|
| + Returns:
|
| + A pair of lists, the first with open issues, second with closed issues.
|
| + """
|
| + if not issue_ids:
|
| + return [], [] # make one common case efficient
|
| +
|
| + issues = self.GetIssues(cnxn, issue_ids)
|
| + project_ids = {issue.project_id for issue in issues}
|
| + configs = self._config_service.GetProjectConfigs(cnxn, project_ids)
|
| + open_issues = []
|
| + closed_issues = []
|
| + for issue in issues:
|
| + config = configs[issue.project_id]
|
| + if tracker_helpers.MeansOpenInProject(
|
| + tracker_bizobj.GetStatus(issue), config):
|
| + open_issues.append(issue)
|
| + else:
|
| + closed_issues.append(issue)
|
| +
|
| + return open_issues, closed_issues
|
| +
|
| + def GetCurrentLocationOfMovedIssue(self, cnxn, project_id, local_id):
|
| + """Return the current location of a moved issue based on old location."""
|
| + issue_id = int(self.issueformerlocations_tbl.SelectValue(
|
| + cnxn, 'issue_id', default=0, project_id=project_id, local_id=local_id))
|
| + if not issue_id:
|
| + return None, None
|
| + project_id, local_id = self.issue_tbl.SelectRow(
|
| + cnxn, cols=['project_id', 'local_id'], id=issue_id)
|
| + return project_id, local_id
|
| +
|
| + def GetPreviousLocations(self, cnxn, issue):
|
| + """Get all the previous locations of an issue."""
|
| + location_rows = self.issueformerlocations_tbl.Select(
|
| + cnxn, cols=['project_id', 'local_id'], issue_id=issue.issue_id)
|
| + locations = [(pid, local_id) for (pid, local_id) in location_rows
|
| + if pid != issue.project_id or local_id != issue.local_id]
|
| + return locations
|
| +
|
| + def InsertIssue(self, cnxn, issue):
|
| + """Store the given issue in SQL.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + issue: Issue PB to insert into the database.
|
| +
|
| + Returns:
|
| + The int issue_id of the newly created issue.
|
| + """
|
| + status_id = self._config_service.LookupStatusID(
|
| + cnxn, issue.project_id, issue.status)
|
| + row = (issue.project_id, issue.local_id, status_id,
|
| + issue.owner_id or None,
|
| + issue.reporter_id,
|
| + issue.opened_timestamp,
|
| + issue.closed_timestamp,
|
| + issue.modified_timestamp,
|
| + issue.derived_owner_id or None,
|
| + self._config_service.LookupStatusID(
|
| + cnxn, issue.project_id, issue.derived_status),
|
| + bool(issue.deleted),
|
| + issue.star_count, issue.attachment_count,
|
| + issue.is_spam)
|
| + # ISSUE_COLs[1:] to skip setting the ID
|
| + # Insert into the Master DB.
|
| + generated_ids = self.issue_tbl.InsertRows(
|
| + cnxn, ISSUE_COLS[1:], [row], commit=False, return_generated_ids=True)
|
| + issue_id = generated_ids[0]
|
| + issue.issue_id = issue_id
|
| + self.issue_tbl.Update(
|
| + cnxn, {'shard': issue_id % settings.num_logical_shards},
|
| + id=issue.issue_id, commit=False)
|
| +
|
| + self._UpdateIssuesSummary(cnxn, [issue], commit=False)
|
| + self._UpdateIssuesLabels(
|
| + cnxn, [issue], issue.project_id, commit=False)
|
| + self._UpdateIssuesFields(cnxn, [issue], commit=False)
|
| + self._UpdateIssuesComponents(cnxn, [issue], commit=False)
|
| + self._UpdateIssuesCc(cnxn, [issue], commit=False)
|
| + self._UpdateIssuesNotify(cnxn, [issue], commit=False)
|
| + self._UpdateIssuesRelation(cnxn, [issue], commit=False)
|
| + cnxn.Commit()
|
| + self._config_service.InvalidateMemcache([issue])
|
| +
|
| + return issue_id
|
| +
|
| + def UpdateIssues(
|
| + self, cnxn, issues, update_cols=None, just_derived=False, commit=True,
|
| + invalidate=True):
|
| + """Update the given issues in SQL.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + issues: list of issues to update.
|
| + update_cols: optional list of just the field names to update.
|
| + just_derived: set to True when only updating derived fields.
|
| + commit: set to False to skip the DB commit and do it in the caller.
|
| + invalidate: set to False to leave cache invalidatation to the caller.
|
| + """
|
| + if not issues:
|
| + return
|
| +
|
| + project_id = issues[0].project_id # All must be in the same project.
|
| + assert all(issue.project_id == project_id for issue in issues)
|
| +
|
| + for issue in issues: # slow, but mysql will not allow REPLACE rows.
|
| + delta = {
|
| + 'project_id': issue.project_id,
|
| + 'local_id': issue.local_id,
|
| + 'owner_id': issue.owner_id or None,
|
| + 'status_id': self._config_service.LookupStatusID(
|
| + cnxn, issue.project_id, issue.status) or None,
|
| + 'opened': issue.opened_timestamp,
|
| + 'closed': issue.closed_timestamp,
|
| + 'modified': issue.modified_timestamp,
|
| + 'derived_owner_id': issue.derived_owner_id or None,
|
| + 'derived_status_id': self._config_service.LookupStatusID(
|
| + cnxn, issue.project_id, issue.derived_status) or None,
|
| + 'deleted': bool(issue.deleted),
|
| + 'star_count': issue.star_count,
|
| + 'attachment_count': issue.attachment_count,
|
| + 'is_spam': issue.is_spam,
|
| + }
|
| + if update_cols is not None:
|
| + delta = {key: val for key, val in delta.iteritems()
|
| + if key in update_cols}
|
| + self.issue_tbl.Update(cnxn, delta, id=issue.issue_id, commit=False)
|
| +
|
| + if not update_cols:
|
| + self._UpdateIssuesLabels(
|
| + cnxn, issues, project_id, commit=False)
|
| + self._UpdateIssuesCc(cnxn, issues, commit=False)
|
| + self._UpdateIssuesFields(cnxn, issues, commit=False)
|
| + self._UpdateIssuesComponents(cnxn, issues, commit=False)
|
| + self._UpdateIssuesNotify(cnxn, issues, commit=False)
|
| + if not just_derived:
|
| + self._UpdateIssuesSummary(cnxn, issues, commit=False)
|
| + self._UpdateIssuesRelation(cnxn, issues, commit=False)
|
| +
|
| + iids_to_invalidate = [issue.issue_id for issue in issues]
|
| + if just_derived and invalidate:
|
| + self.issue_2lc.InvalidateAllKeys(cnxn, iids_to_invalidate)
|
| + elif invalidate:
|
| + self.issue_2lc.InvalidateKeys(cnxn, iids_to_invalidate)
|
| + if commit:
|
| + cnxn.Commit()
|
| + if invalidate:
|
| + self._config_service.InvalidateMemcache(issues)
|
| +
|
| + def UpdateIssue(
|
| + self, cnxn, issue, update_cols=None, just_derived=False, commit=True,
|
| + invalidate=True):
|
| + """Update the given issue in SQL.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + issue: the issue to update.
|
| + update_cols: optional list of just the field names to update.
|
| + just_derived: set to True when only updating derived fields.
|
| + commit: set to False to skip the DB commit and do it in the caller.
|
| + invalidate: set to False to leave cache invalidatation to the caller.
|
| + """
|
| + self.UpdateIssues(
|
| + cnxn, [issue], update_cols=update_cols, just_derived=just_derived,
|
| + commit=commit, invalidate=invalidate)
|
| +
|
| + def _UpdateIssuesSummary(self, cnxn, issues, commit=True):
|
| + """Update the IssueSummary table rows for the given issues."""
|
| + self.issuesummary_tbl.InsertRows(
|
| + cnxn, ISSUESUMMARY_COLS,
|
| + [(issue.issue_id, issue.summary) for issue in issues],
|
| + replace=True, commit=commit)
|
| +
|
| + def _UpdateIssuesLabels(self, cnxn, issues, project_id, commit=True):
|
| + """Update the Issue2Label table rows for the given issues."""
|
| + label_rows = []
|
| + for issue in issues:
|
| + issue_shard = issue.issue_id % settings.num_logical_shards
|
| + # TODO(jrobbins): If the user adds many novel labels in one issue update,
|
| + # that could be slow. Solution is to add all new labels in a batch first.
|
| + label_rows.extend(
|
| + (issue.issue_id,
|
| + self._config_service.LookupLabelID(cnxn, project_id, label), False,
|
| + issue_shard)
|
| + for label in issue.labels)
|
| + label_rows.extend(
|
| + (issue.issue_id,
|
| + self._config_service.LookupLabelID(cnxn, project_id, label), True,
|
| + issue_shard)
|
| + for label in issue.derived_labels)
|
| +
|
| + self.issue2label_tbl.Delete(
|
| + cnxn, issue_id=[issue.issue_id for issue in issues],
|
| + commit=False)
|
| + self.issue2label_tbl.InsertRows(
|
| + cnxn, ISSUE2LABEL_COLS + ['issue_shard'],
|
| + label_rows, ignore=True, commit=commit)
|
| +
|
| + def _UpdateIssuesFields(self, cnxn, issues, commit=True):
|
| + """Update the Issue2FieldValue table rows for the given issues."""
|
| + fieldvalue_rows = []
|
| + for issue in issues:
|
| + issue_shard = issue.issue_id % settings.num_logical_shards
|
| + for fv in issue.field_values:
|
| + fieldvalue_rows.append(
|
| + (issue.issue_id, fv.field_id, fv.int_value, fv.str_value,
|
| + fv.user_id or None, fv.derived, issue_shard))
|
| +
|
| + self.issue2fieldvalue_tbl.Delete(
|
| + cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
|
| + self.issue2fieldvalue_tbl.InsertRows(
|
| + cnxn, ISSUE2FIELDVALUE_COLS + ['issue_shard'],
|
| + fieldvalue_rows, commit=commit)
|
| +
|
| + def _UpdateIssuesComponents(self, cnxn, issues, commit=True):
|
| + """Update the Issue2Component table rows for the given issues."""
|
| + issue2component_rows = []
|
| + for issue in issues:
|
| + issue_shard = issue.issue_id % settings.num_logical_shards
|
| + issue2component_rows.extend(
|
| + (issue.issue_id, component_id, False, issue_shard)
|
| + for component_id in issue.component_ids)
|
| + issue2component_rows.extend(
|
| + (issue.issue_id, component_id, True, issue_shard)
|
| + for component_id in issue.derived_component_ids)
|
| +
|
| + self.issue2component_tbl.Delete(
|
| + cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
|
| + self.issue2component_tbl.InsertRows(
|
| + cnxn, ISSUE2COMPONENT_COLS + ['issue_shard'],
|
| + issue2component_rows, ignore=True, commit=commit)
|
| +
|
| + def _UpdateIssuesCc(self, cnxn, issues, commit=True):
|
| + """Update the Issue2Cc table rows for the given issues."""
|
| + cc_rows = []
|
| + for issue in issues:
|
| + issue_shard = issue.issue_id % settings.num_logical_shards
|
| + cc_rows.extend(
|
| + (issue.issue_id, cc_id, False, issue_shard)
|
| + for cc_id in issue.cc_ids)
|
| + cc_rows.extend(
|
| + (issue.issue_id, cc_id, True, issue_shard)
|
| + for cc_id in issue.derived_cc_ids)
|
| +
|
| + self.issue2cc_tbl.Delete(
|
| + cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
|
| + self.issue2cc_tbl.InsertRows(
|
| + cnxn, ISSUE2CC_COLS + ['issue_shard'],
|
| + cc_rows, ignore=True, commit=commit)
|
| +
|
| + def _UpdateIssuesNotify(self, cnxn, issues, commit=True):
|
| + """Update the Issue2Notify table rows for the given issues."""
|
| + notify_rows = []
|
| + for issue in issues:
|
| + derived_rows = [[issue.issue_id, email]
|
| + for email in issue.derived_notify_addrs]
|
| + notify_rows.extend(derived_rows)
|
| +
|
| + self.issue2notify_tbl.Delete(
|
| + cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
|
| + self.issue2notify_tbl.InsertRows(
|
| + cnxn, ISSUE2NOTIFY_COLS, notify_rows, ignore=True, commit=commit)
|
| +
|
| + def _UpdateIssuesRelation(self, cnxn, issues, commit=True):
|
| + """Update the IssueRelation table rows for the given issues."""
|
| + relation_rows = []
|
| + dangling_relation_rows = []
|
| + for issue in issues:
|
| + for dst_issue_id in issue.blocked_on_iids:
|
| + relation_rows.append((issue.issue_id, dst_issue_id, 'blockedon'))
|
| + for dst_issue_id in issue.blocking_iids:
|
| + relation_rows.append((dst_issue_id, issue.issue_id, 'blockedon'))
|
| + for dst_ref in issue.dangling_blocked_on_refs:
|
| + dangling_relation_rows.append((
|
| + issue.issue_id, dst_ref.project, dst_ref.issue_id, 'blockedon'))
|
| + for dst_ref in issue.dangling_blocking_refs:
|
| + dangling_relation_rows.append((
|
| + issue.issue_id, dst_ref.project, dst_ref.issue_id, 'blocking'))
|
| + if issue.merged_into:
|
| + relation_rows.append((issue.issue_id, issue.merged_into, 'mergedinto'))
|
| +
|
| + self.issuerelation_tbl.Delete(
|
| + cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
|
| + self.issuerelation_tbl.Delete(
|
| + cnxn, dst_issue_id=[issue.issue_id for issue in issues],
|
| + kind='blockedon', commit=False)
|
| + self.issuerelation_tbl.InsertRows(
|
| + cnxn, ISSUERELATION_COLS, relation_rows, ignore=True, commit=commit)
|
| + self.danglingrelation_tbl.Delete(
|
| + cnxn, issue_id=[issue.issue_id for issue in issues], commit=False)
|
| + self.danglingrelation_tbl.InsertRows(
|
| + cnxn, DANGLINGRELATION_COLS, dangling_relation_rows, ignore=True,
|
| + commit=commit)
|
| +
|
| + def _UpdateIssuesModified(
|
| + self, cnxn, iids, modified_timestamp=None, invalidate=True):
|
| + """Store a modified timestamp for each of the specified issues."""
|
| + delta = {'modified': modified_timestamp or int(time.time())}
|
| + self.issue_tbl.Update(cnxn, delta, id=iids, commit=False)
|
| + if invalidate:
|
| + self.InvalidateIIDs(cnxn, iids)
|
| +
|
| + def DeltaUpdateIssue(
|
| + self, cnxn, services, reporter_id, project_id,
|
| + config, issue, status, owner_id, cc_add, cc_remove, comp_ids_add,
|
| + comp_ids_remove, labels_add, labels_remove, field_vals_add,
|
| + field_vals_remove, fields_clear, blocked_on_add=None,
|
| + blocked_on_remove=None, blocking_add=None, blocking_remove=None,
|
| + merged_into=None, index_now=False, comment=None, summary=None,
|
| + iids_to_invalidate=None, rules=None, predicate_asts=None,
|
| + timestamp=None):
|
| + """Update the issue in the database and return a set of update tuples.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + services: connections to persistence layer.
|
| + reporter_id: user ID of the user making this change.
|
| + project_id: int ID for the current project.
|
| + config: ProjectIssueConfig PB for this project.
|
| + issue: Issue PB of issue to update.
|
| + status: new issue status string, if a change is desired.
|
| + owner_id: user ID of the new issue owner, if a change is desired.
|
| + cc_add: list of user IDs of users to add to CC list.
|
| + cc_remove: list of user IDs of users to remove from CC list.
|
| + comp_ids_add: list of component IDs to add to the issue.
|
| + comp_ids_remove: list of component IDs to remove from the issue.
|
| + labels_add: list of issue label strings to add.
|
| + labels_remove: list of issue label strings to remove.
|
| + field_vals_add: dict of FieldValue PBs to add.
|
| + field_vals_remove: list of FieldValue PBs to remove.
|
| + fields_clear: list of custom field IDs to clear.
|
| + blocked_on_add: list of IIDs that this issue is now blocked on.
|
| + blocked_on_remove: list of IIDs that this issue is no longer blocked on.
|
| + blocking_add: list of IIDs that this issue is blocking.
|
| + blocking_remove: list of IIDs that this issue is no longer blocking.
|
| + merged_into: IID of issue that this issue was merged into, 0 to clear,
|
| + or None for no change.
|
| + index_now: True if the issue should be updated in the full text index.
|
| + comment: This should be the content of the comment
|
| + corresponding to this change.
|
| + summary: new issue summary, currently only used by GData API.
|
| + rules: optional list of preloaded FilterRule PBs for this project.
|
| + predicate_asts: optional list of QueryASTs for the rules. If rules are
|
| + provided, then predicate_asts should also be provided.
|
| + timestamp: int timestamp set during testing, otherwise defaults to
|
| + int(time.time()).
|
| +
|
| + Returns:
|
| + A list of Amendment PBs that describe the set of metadata updates that
|
| + the user made. This tuple is later used in making the IssueComment.
|
| + """
|
| + old_effective_status = tracker_bizobj.GetStatus(issue)
|
| +
|
| + # Make all user input safe to echo out again later.
|
| + status = framework_bizobj.CanonicalizeLabel(status)
|
| + labels_add = [framework_bizobj.CanonicalizeLabel(l) for l in labels_add]
|
| + labels_add = [l for l in labels_add if l]
|
| + labels_remove = [framework_bizobj.CanonicalizeLabel(l)
|
| + for l in labels_remove]
|
| + labels_remove = [l for l in labels_remove if l]
|
| +
|
| + logging.info(
|
| + 'Bulk edit to project_id %s issue.local_id %s',
|
| + project_id, issue.local_id)
|
| + if iids_to_invalidate is None:
|
| + iids_to_invalidate = set([issue.issue_id])
|
| + invalidate = True
|
| + else:
|
| + iids_to_invalidate.add(issue.issue_id)
|
| + invalidate = False # Caller will do it.
|
| +
|
| + # Store each updated value in the issue PB, and compute Update PBs
|
| + amendments = []
|
| + if status is not None and status != issue.status:
|
| + amendments.append(tracker_bizobj.MakeStatusAmendment(
|
| + status, issue.status))
|
| + issue.status = status
|
| + if owner_id is not None and owner_id != issue.owner_id:
|
| + amendments.append(tracker_bizobj.MakeOwnerAmendment(
|
| + owner_id, issue.owner_id))
|
| + issue.owner_id = owner_id
|
| +
|
| + # compute the set of cc'd users added and removed
|
| + cc_add = [cc for cc in cc_add if cc not in issue.cc_ids]
|
| + cc_remove = [cc for cc in cc_remove if cc in issue.cc_ids]
|
| + if cc_add or cc_remove:
|
| + cc_ids = [cc for cc in list(issue.cc_ids) + cc_add
|
| + if cc not in cc_remove]
|
| + issue.cc_ids = cc_ids
|
| + amendments.append(tracker_bizobj.MakeCcAmendment(cc_add, cc_remove))
|
| +
|
| + # compute the set of components added and removed
|
| + comp_ids_add = [c for c in comp_ids_add if c not in issue.component_ids]
|
| + comp_ids_remove = [c for c in comp_ids_remove if c in issue.component_ids]
|
| + if comp_ids_add or comp_ids_remove:
|
| + comp_ids = [cid for cid in list(issue.component_ids) + comp_ids_add
|
| + if cid not in comp_ids_remove]
|
| + issue.component_ids = comp_ids
|
| + amendments.append(tracker_bizobj.MakeComponentsAmendment(
|
| + comp_ids_add, comp_ids_remove, config))
|
| +
|
| + # compute the set of labels added and removed
|
| + (labels, update_labels_add,
|
| + update_labels_remove) = framework_bizobj.MergeLabels(
|
| + issue.labels, labels_add, labels_remove,
|
| + config.exclusive_label_prefixes)
|
| +
|
| + if update_labels_add or update_labels_remove:
|
| + issue.labels = labels
|
| + amendments.append(tracker_bizobj.MakeLabelsAmendment(
|
| + update_labels_add, update_labels_remove))
|
| +
|
| + # compute the set of custom fields added and removed
|
| + (field_vals, update_fields_add,
|
| + update_fields_remove) = tracker_bizobj.MergeFields(
|
| + issue.field_values, field_vals_add, field_vals_remove,
|
| + config.field_defs)
|
| +
|
| + if update_fields_add or update_fields_remove:
|
| + issue.field_values = field_vals
|
| + for fd in config.field_defs:
|
| + added_values_this_field = [
|
| + fv for fv in update_fields_add if fv.field_id == fd.field_id]
|
| + if added_values_this_field:
|
| + amendments.append(tracker_bizobj.MakeFieldAmendment(
|
| + fd.field_id, config,
|
| + [tracker_bizobj.GetFieldValue(fv, {})
|
| + for fv in added_values_this_field],
|
| + old_values=[]))
|
| + removed_values_this_field = [
|
| + fv for fv in update_fields_remove if fv.field_id == fd.field_id]
|
| + if removed_values_this_field:
|
| + amendments.append(tracker_bizobj.MakeFieldAmendment(
|
| + fd.field_id, config, [],
|
| + old_values=[tracker_bizobj.GetFieldValue(fv, {})
|
| + for fv in removed_values_this_field]))
|
| +
|
| + if fields_clear:
|
| + field_clear_set = set(fields_clear)
|
| + revised_fields = []
|
| + for fd in config.field_defs:
|
| + if fd.field_id not in field_clear_set:
|
| + revised_fields.extend(
|
| + fv for fv in issue.field_values if fv.field_id == fd.field_id)
|
| + else:
|
| + amendments.append(
|
| + tracker_bizobj.MakeFieldClearedAmendment(fd.field_id, config))
|
| + if fd.field_type == tracker_pb2.FieldTypes.ENUM_TYPE:
|
| + prefix = fd.field_name.lower() + '-'
|
| + filtered_labels = [
|
| + lab for lab in issue.labels
|
| + if not lab.lower().startswith(prefix)]
|
| + issue.labels = filtered_labels
|
| +
|
| + issue.field_values = revised_fields
|
| +
|
| + if blocked_on_add or blocked_on_remove:
|
| + old_blocked_on = issue.blocked_on_iids
|
| + blocked_on_add = [iid for iid in blocked_on_add
|
| + if iid not in old_blocked_on]
|
| + add_refs = [(ref_issue.project_name, ref_issue.local_id)
|
| + for ref_issue in self.GetIssues(cnxn, blocked_on_add)]
|
| + blocked_on_rm = [iid for iid in blocked_on_remove
|
| + if iid in old_blocked_on]
|
| + remove_refs = [
|
| + (ref_issue.project_name, ref_issue.local_id)
|
| + for ref_issue in self.GetIssues(cnxn, blocked_on_rm)]
|
| + amendments.append(tracker_bizobj.MakeBlockedOnAmendment(
|
| + add_refs, remove_refs, default_project_name=issue.project_name))
|
| + blocked_on = [iid for iid in old_blocked_on + blocked_on_add
|
| + if iid not in blocked_on_remove]
|
| + issue.blocked_on_iids = blocked_on
|
| + iids_to_invalidate.update(blocked_on_add + blocked_on_remove)
|
| +
|
| + if blocking_add or blocking_remove:
|
| + old_blocking = issue.blocking_iids
|
| + blocking_add = [iid for iid in blocking_add
|
| + if iid not in old_blocking]
|
| + add_refs = [(ref_issue.project_name, ref_issue.local_id)
|
| + for ref_issue in self.GetIssues(cnxn, blocking_add)]
|
| + blocking_remove = [iid for iid in blocking_remove
|
| + if iid in old_blocking]
|
| + remove_refs = [
|
| + (ref_issue.project_name, ref_issue.local_id)
|
| + for ref_issue in self.GetIssues(cnxn, blocking_remove)]
|
| + amendments.append(tracker_bizobj.MakeBlockingAmendment(
|
| + add_refs, remove_refs, default_project_name=issue.project_name))
|
| + blocking_refs = [iid for iid in old_blocking + blocking_add
|
| + if iid not in blocking_remove]
|
| + issue.blocking_iids = blocking_refs
|
| + iids_to_invalidate.update(blocking_add + blocking_remove)
|
| +
|
| + if merged_into is not None and merged_into != issue.merged_into:
|
| + merged_remove = issue.merged_into
|
| + merged_add = merged_into
|
| + issue.merged_into = merged_into
|
| + try:
|
| + remove_issue = self.GetIssue(cnxn, merged_remove)
|
| + remove_ref = remove_issue.project_name, remove_issue.local_id
|
| + iids_to_invalidate.add(merged_remove)
|
| + except NoSuchIssueException:
|
| + remove_ref = None
|
| +
|
| + try:
|
| + add_issue = self.GetIssue(cnxn, merged_add)
|
| + add_ref = add_issue.project_name, add_issue.local_id
|
| + iids_to_invalidate.add(merged_add)
|
| + except NoSuchIssueException:
|
| + add_ref = None
|
| +
|
| + amendments.append(tracker_bizobj.MakeMergedIntoAmendment(
|
| + add_ref, remove_ref, default_project_name=issue.project_name))
|
| +
|
| + if summary and summary != issue.summary:
|
| + amendments.append(tracker_bizobj.MakeSummaryAmendment(
|
| + summary, issue.summary))
|
| + issue.summary = summary
|
| +
|
| + # If this was a no-op with no comment, bail out and don't save,
|
| + # invalidate, or re-index anything.
|
| + if not amendments and (not comment or not comment.strip()):
|
| + return [], None
|
| +
|
| + # Note: no need to check for collisions when the user is doing a delta.
|
| +
|
| + # update the modified_timestamp for any comment added, even if it was
|
| + # just a text comment with no issue fields changed.
|
| + issue.modified_timestamp = timestamp or int(time.time())
|
| +
|
| + # Update the closed timestamp before filter rules so that rules
|
| + # can test for closed_timestamp, and also after filter rules
|
| + # so that closed_timestamp will be set if the issue is closed by the rule.
|
| + _UpdateClosedTimestamp(config, issue, old_effective_status)
|
| + if rules is None:
|
| + logging.info('Rules were not given')
|
| + rules = services.features.GetFilterRules(cnxn, config.project_id)
|
| + predicate_asts = filterrules_helpers.ParsePredicateASTs(
|
| + rules, config, None)
|
| +
|
| + filterrules_helpers.ApplyGivenRules(
|
| + cnxn, services, issue, config, rules, predicate_asts)
|
| + _UpdateClosedTimestamp(config, issue, old_effective_status)
|
| +
|
| + # Store the issue in SQL.
|
| + self.UpdateIssue(cnxn, issue, commit=False, invalidate=False)
|
| +
|
| + comment_pb = self.CreateIssueComment(
|
| + cnxn, project_id, issue.local_id, reporter_id, comment,
|
| + amendments=amendments, commit=False)
|
| + self._UpdateIssuesModified(
|
| + cnxn, iids_to_invalidate, modified_timestamp=issue.modified_timestamp,
|
| + invalidate=invalidate)
|
| +
|
| + if not invalidate:
|
| + cnxn.Commit()
|
| +
|
| + if index_now:
|
| + tracker_fulltext.IndexIssues(
|
| + cnxn, [issue], services.user_service, self, self._config_service)
|
| +
|
| + return amendments, comment_pb
|
| +
|
| + def InvalidateIIDs(self, cnxn, iids_to_invalidate):
|
| + """Invalidate the specified issues in the Invalidate table and memcache."""
|
| + issues_to_invalidate = self.GetIssues(cnxn, iids_to_invalidate)
|
| + self.issue_2lc.InvalidateKeys(cnxn, iids_to_invalidate)
|
| + self._config_service.InvalidateMemcache(issues_to_invalidate)
|
| +
|
| + def ApplyIssueComment(
|
| + self, cnxn, services, reporter_id, project_id,
|
| + local_id, summary, status, owner_id, cc_ids, labels, field_values,
|
| + component_ids, blocked_on, blocking, dangling_blocked_on_refs,
|
| + dangling_blocking_refs, merged_into, index_now=True,
|
| + page_gen_ts=None, comment=None, inbound_message=None, attachments=None,
|
| + timestamp=None):
|
| + """Update the issue in the database and return info for notifications.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + services: connection to persistence layer.
|
| + reporter_id: user ID of the user making this change.
|
| + project_id: int Project ID for the current project.
|
| + local_id: integer local ID of the issue to update.
|
| + summary: new issue summary string.
|
| + status: new issue status string.
|
| + owner_id: user ID of the new issue owner.
|
| + cc_ids: list of user IDs of users to CC when the issue changes.
|
| + labels: list of new issue label strings.
|
| + field_values: list of FieldValue PBs.
|
| + component_ids: list of int component IDs.
|
| + blocked_on: list of IIDs that this issue is blocked on.
|
| + blocking: list of IIDs that this issue is blocking.
|
| + dangling_blocked_on_refs: list of Codesite issues this is blocked on.
|
| + dangling_blocking_refs: list of Codesite issues this is blocking.
|
| + merged_into: IID of issue that this issue was merged into, 0 to clear.
|
| + index_now: True if the issue should be updated in the full text index.
|
| + page_gen_ts: time at which the issue HTML page was generated,
|
| + used in detecting mid-air collisions.
|
| + comment: This should be the content of the comment
|
| + corresponding to this change.
|
| + inbound_message: optional string full text of an email that caused
|
| + this comment to be added.
|
| + attachments: This should be a list of
|
| + [(filename, contents, mimetype),...] attachments uploaded at
|
| + the time the comment was made.
|
| + timestamp: int timestamp set during testing, otherwise defaults to
|
| + int(time.time()).
|
| +
|
| + Returns:
|
| + (amendments, comment_pb). Amendments is a list of Amendment PBs
|
| + that describe the set of metadata updates that the user made.
|
| + Comment_pb is the IssueComment for the change.
|
| +
|
| + Raises:
|
| + MidAirCollisionException: indicates that the issue has been
|
| + changed since the user loaded the page.
|
| + """
|
| + status = framework_bizobj.CanonicalizeLabel(status)
|
| + labels = [framework_bizobj.CanonicalizeLabel(l) for l in labels]
|
| + labels = [l for l in labels if l]
|
| +
|
| + # Use canonical label names
|
| + label_ids = self._config_service.LookupLabelIDs(
|
| + cnxn, project_id, labels, autocreate=True)
|
| + labels = [self._config_service.LookupLabel(cnxn, project_id, l_id)
|
| + for l_id in label_ids]
|
| +
|
| + # Get the issue and project configurations.
|
| + config = self._config_service.GetProjectConfig(cnxn, project_id)
|
| + issue = self.GetIssueByLocalID(cnxn, project_id, local_id)
|
| +
|
| + # Store each updated value in the issue PB, and compute amendments
|
| + amendments = []
|
| + iids_to_invalidate = set()
|
| +
|
| + if summary and summary != issue.summary:
|
| + amendments.append(tracker_bizobj.MakeSummaryAmendment(
|
| + summary, issue.summary))
|
| + issue.summary = summary
|
| +
|
| + old_effective_status = tracker_bizobj.GetStatus(issue)
|
| + if status != issue.status:
|
| + amendments.append(tracker_bizobj.MakeStatusAmendment(
|
| + status, issue.status))
|
| + issue.status = status
|
| +
|
| + if owner_id != issue.owner_id:
|
| + amendments.append(tracker_bizobj.MakeOwnerAmendment(
|
| + owner_id, issue.owner_id))
|
| + if owner_id == framework_constants.NO_USER_SPECIFIED:
|
| + issue.reset('owner_id')
|
| + else:
|
| + issue.owner_id = owner_id
|
| +
|
| + # TODO(jrobbins): factor the CC code into a method and add a test
|
| + # compute the set of cc'd users added and removed
|
| + cc_added = [cc for cc in cc_ids if cc not in issue.cc_ids]
|
| + cc_removed = [cc for cc in issue.cc_ids if cc not in cc_ids]
|
| + if cc_added or cc_removed:
|
| + amendments.append(tracker_bizobj.MakeCcAmendment(cc_added, cc_removed))
|
| + issue.cc_ids = cc_ids
|
| +
|
| + # TODO(jrobbins): factor the labels code into a method and add a test
|
| + # compute the set of labels added and removed
|
| + labels_added = [lab for lab in labels
|
| + if lab not in issue.labels]
|
| + labels_removed = [lab for lab in issue.labels
|
| + if lab not in labels]
|
| + if labels_added or labels_removed:
|
| + amendments.append(tracker_bizobj.MakeLabelsAmendment(
|
| + labels_added, labels_removed))
|
| + issue.labels = labels
|
| +
|
| + old_field_values = collections.defaultdict(list)
|
| + for ofv in issue.field_values:
|
| + # Passing {} because I just want the user_id, not the email address.
|
| + old_field_values[ofv.field_id].append(
|
| + tracker_bizobj.GetFieldValue(ofv, {}))
|
| + for field_id, values in old_field_values.iteritems():
|
| + old_field_values[field_id] = sorted(values)
|
| +
|
| + new_field_values = collections.defaultdict(list)
|
| + for nfv in field_values:
|
| + new_field_values[nfv.field_id].append(
|
| + tracker_bizobj.GetFieldValue(nfv, {}))
|
| + for field_id, values in new_field_values.iteritems():
|
| + new_field_values[field_id] = sorted(values)
|
| +
|
| + field_ids_added = {fv.field_id for fv in field_values
|
| + if fv.field_id not in old_field_values}
|
| + field_ids_removed = {ofv.field_id for ofv in issue.field_values
|
| + if ofv.field_id not in new_field_values}
|
| + field_ids_changed = {
|
| + fv.field_id for fv in field_values
|
| + if (fv.field_id in old_field_values and
|
| + old_field_values[fv.field_id] != new_field_values[fv.field_id])}
|
| +
|
| + if field_ids_added or field_ids_removed or field_ids_changed:
|
| + amendments.extend(
|
| + tracker_bizobj.MakeFieldAmendment(fid, config, new_field_values[fid])
|
| + for fid in field_ids_added)
|
| + amendments.extend(
|
| + tracker_bizobj.MakeFieldAmendment(
|
| + fid, config, new_field_values[fid],
|
| + old_values=old_field_values[fid])
|
| + for fid in field_ids_changed)
|
| + amendments.extend(
|
| + tracker_bizobj.MakeFieldAmendment(fid, config, [])
|
| + for fid in field_ids_removed)
|
| +
|
| + issue.field_values = field_values
|
| +
|
| + comps_added = [comp for comp in component_ids
|
| + if comp not in issue.component_ids]
|
| + comps_removed = [comp for comp in issue.component_ids
|
| + if comp not in component_ids]
|
| + if comps_added or comps_removed:
|
| + amendments.append(tracker_bizobj.MakeComponentsAmendment(
|
| + comps_added, comps_removed, config))
|
| + issue.component_ids = component_ids
|
| +
|
| + if merged_into != issue.merged_into:
|
| + # TODO(jrobbins): refactor this into LookupIssueRefByIssueID().
|
| + try:
|
| + merged_remove = self.GetIssue(cnxn, issue.merged_into)
|
| + remove_ref = merged_remove.project_name, merged_remove.local_id
|
| + iids_to_invalidate.add(issue.merged_into)
|
| + except NoSuchIssueException:
|
| + remove_ref = None
|
| +
|
| + try:
|
| + merged_add = self.GetIssue(cnxn, merged_into)
|
| + add_ref = merged_add.project_name, merged_add.local_id
|
| + iids_to_invalidate.add(merged_into)
|
| + except NoSuchIssueException:
|
| + add_ref = None
|
| +
|
| + issue.merged_into = merged_into
|
| + amendments.append(tracker_bizobj.MakeMergedIntoAmendment(
|
| + add_ref, remove_ref, default_project_name=issue.project_name))
|
| +
|
| + blockers_added, blockers_removed = framework_helpers.ComputeListDeltas(
|
| + issue.blocked_on_iids, blocked_on)
|
| + danglers_added, danglers_removed = framework_helpers.ComputeListDeltas(
|
| + issue.dangling_blocked_on_refs, dangling_blocked_on_refs)
|
| + blocked_add_issues = []
|
| + blocked_remove_issues = []
|
| + if blockers_added or blockers_removed or danglers_added or danglers_removed:
|
| + blocked_add_issues = self.GetIssues(cnxn, blockers_added)
|
| + add_refs = [(ref_issue.project_name, ref_issue.local_id)
|
| + for ref_issue in blocked_add_issues]
|
| + add_refs.extend([(ref.project, ref.issue_id) for ref in danglers_added])
|
| + blocked_remove_issues = self.GetIssues(cnxn, blockers_removed)
|
| + remove_refs = [
|
| + (ref_issue.project_name, ref_issue.local_id)
|
| + for ref_issue in blocked_remove_issues]
|
| + remove_refs.extend([(ref.project, ref.issue_id)
|
| + for ref in danglers_removed])
|
| + amendments.append(tracker_bizobj.MakeBlockedOnAmendment(
|
| + add_refs, remove_refs, default_project_name=issue.project_name))
|
| + issue.blocked_on_iids = blocked_on
|
| + issue.dangling_blocked_on_refs = dangling_blocked_on_refs
|
| + iids_to_invalidate.update(blockers_added + blockers_removed)
|
| +
|
| + blockers_added, blockers_removed = framework_helpers.ComputeListDeltas(
|
| + issue.blocking_iids, blocking)
|
| + danglers_added, danglers_removed = framework_helpers.ComputeListDeltas(
|
| + issue.dangling_blocking_refs, dangling_blocking_refs)
|
| + blocking_add_issues = []
|
| + blocking_remove_issues = []
|
| + if blockers_added or blockers_removed or danglers_added or danglers_removed:
|
| + blocking_add_issues = self.GetIssues(cnxn, blockers_added)
|
| + add_refs = [(ref_issue.project_name, ref_issue.local_id)
|
| + for ref_issue in blocking_add_issues]
|
| + add_refs.extend([(ref.project, ref.issue_id) for ref in danglers_added])
|
| + blocking_remove_issues = self.GetIssues(cnxn, blockers_removed)
|
| + remove_refs = [
|
| + (ref_issue.project_name, ref_issue.local_id)
|
| + for ref_issue in blocking_remove_issues]
|
| + remove_refs.extend([(ref.project, ref.issue_id)
|
| + for ref in danglers_removed])
|
| + amendments.append(tracker_bizobj.MakeBlockingAmendment(
|
| + add_refs, remove_refs, default_project_name=issue.project_name))
|
| + issue.blocking_iids = blocking
|
| + issue.dangling_blocking_refs = dangling_blocking_refs
|
| + iids_to_invalidate.update(blockers_added + blockers_removed)
|
| +
|
| + logging.info('later amendments so far is %r', amendments)
|
| +
|
| + # Raise an exception if the issue was changed by another user
|
| + # while this user was viewing/editing the issue.
|
| + if page_gen_ts and amendments:
|
| + # The issue timestamp is stored in seconds, convert to microseconds to
|
| + # match the page_gen_ts.
|
| + issue_ts = issue.modified_timestamp * 1000000
|
| + if issue_ts > page_gen_ts:
|
| + logging.info('%d > %d', issue_ts, page_gen_ts)
|
| + logging.info('amendments: %s', amendments)
|
| + # Forget all the modificiations made to this issue in RAM.
|
| + self.issue_2lc.InvalidateKeys(cnxn, [issue.issue_id])
|
| + raise MidAirCollisionException('issue %d' % local_id, local_id)
|
| +
|
| + # update the modified_timestamp for any comment added, even if it was
|
| + # just a text comment with no issue fields changed.
|
| + issue.modified_timestamp = timestamp or int(time.time())
|
| +
|
| + # Update closed_timestamp both before and after filter rules.
|
| + _UpdateClosedTimestamp(config, issue, old_effective_status)
|
| + filterrules_helpers.ApplyFilterRules(cnxn, services, issue, config)
|
| + _UpdateClosedTimestamp(config, issue, old_effective_status)
|
| +
|
| + self.UpdateIssue(cnxn, issue)
|
| + # TODO(jrobbins): only invalidate nonviewable if the following changed:
|
| + # restriction label, owner, cc, or user-type custom field.
|
| + self._config_service.InvalidateMemcache([issue], key_prefix='nonviewable:')
|
| +
|
| + classification = services.spam.ClassifyComment(comment)
|
| +
|
| + label = classification['outputLabel']
|
| + logging.info('comment classification: %s' % classification)
|
| + score = 0
|
| + is_spam = False
|
| + for output in classification['outputMulti']:
|
| + if output['label'] == label:
|
| + score = float(output['score'])
|
| + if label == 'spam' and score > settings.classifier_spam_thresh:
|
| + logging.info('spam comment: %s' % comment)
|
| + is_spam = True
|
| +
|
| + if amendments or (comment and comment.strip()) or attachments:
|
| + logging.info('amendments = %r', amendments)
|
| + comment_pb = self.CreateIssueComment(
|
| + cnxn, project_id, local_id, reporter_id, comment,
|
| + amendments=amendments, attachments=attachments,
|
| + inbound_message=inbound_message, is_spam=is_spam)
|
| + services.spam.RecordClassifierCommentVerdict(
|
| + cnxn, comment_pb, is_spam, score)
|
| + else:
|
| + comment_pb = None
|
| +
|
| + # Add a comment to the newly added issues saying they are now blocking
|
| + # this issue.
|
| + for add_issue in blocked_add_issues:
|
| + self.CreateIssueComment(
|
| + cnxn, add_issue.project_id, add_issue.local_id, reporter_id,
|
| + content='',
|
| + amendments=[tracker_bizobj.MakeBlockingAmendment(
|
| + [(issue.project_name, issue.local_id)], [],
|
| + default_project_name=add_issue.project_name)])
|
| + # Add a comment to the newly removed issues saying they are no longer
|
| + # blocking this issue.
|
| + for remove_issue in blocked_remove_issues:
|
| + self.CreateIssueComment(
|
| + cnxn, remove_issue.project_id, remove_issue.local_id, reporter_id,
|
| + content='',
|
| + amendments=[tracker_bizobj.MakeBlockingAmendment(
|
| + [], [(issue.project_name, issue.local_id)],
|
| + default_project_name=remove_issue.project_name)])
|
| +
|
| + # Add a comment to the newly added issues saying they are now blocked on
|
| + # this issue.
|
| + for add_issue in blocking_add_issues:
|
| + self.CreateIssueComment(
|
| + cnxn, add_issue.project_id, add_issue.local_id, reporter_id,
|
| + content='',
|
| + amendments=[tracker_bizobj.MakeBlockedOnAmendment(
|
| + [(issue.project_name, issue.local_id)], [],
|
| + default_project_name=add_issue.project_name)])
|
| + # Add a comment to the newly removed issues saying they are no longer
|
| + # blocked on this issue.
|
| + for remove_issue in blocking_remove_issues:
|
| + self.CreateIssueComment(
|
| + cnxn, remove_issue.project_id, remove_issue.local_id, reporter_id,
|
| + content='',
|
| + amendments=[tracker_bizobj.MakeBlockedOnAmendment(
|
| + [], [(issue.project_name, issue.local_id)],
|
| + default_project_name=remove_issue.project_name)])
|
| +
|
| + self._UpdateIssuesModified(
|
| + cnxn, iids_to_invalidate, modified_timestamp=issue.modified_timestamp)
|
| +
|
| + if index_now:
|
| + tracker_fulltext.IndexIssues(
|
| + cnxn, [issue], services.user, self, self._config_service)
|
| +
|
| + if is_spam:
|
| + sequence_num = len(self.GetCommentsForIssue(cnxn, issue.issue_id)) - 1
|
| + # Soft-deletes have to have a user ID, so spam comments are
|
| + # just "deleted" by the commenter.
|
| + self.SoftDeleteComment(cnxn, project_id, local_id, sequence_num,
|
| + reporter_id, services.user, is_spam=True)
|
| + return amendments, comment_pb
|
| +
|
| + def RelateIssues(self, cnxn, issue_relation_dict, commit=True):
|
| + """Update the IssueRelation table rows for the given relationships.
|
| +
|
| + issue_relation_dict is a mapping of 'source' issues to 'destination' issues,
|
| + paired with the kind of relationship connecting the two.
|
| + """
|
| + relation_rows = []
|
| + for src_iid, dests in issue_relation_dict.iteritems():
|
| + for dst_iid, kind in dests:
|
| + if kind == 'blocking':
|
| + relation_rows.append((dst_iid, src_iid, 'blockedon'))
|
| + elif kind == 'blockedon' or kind == 'mergedinto':
|
| + relation_rows.append((src_iid, dst_iid, kind))
|
| +
|
| + self.issuerelation_tbl.InsertRows(
|
| + cnxn, ISSUERELATION_COLS, relation_rows, ignore=True, commit=commit)
|
| +
|
| + def CopyIssues(self, cnxn, dest_project, issues, user_service, copier_id):
|
| + """Copy the given issues into the destination project."""
|
| + created_issues = []
|
| + iids_to_invalidate = set()
|
| +
|
| + for target_issue in issues:
|
| + new_issue = tracker_pb2.Issue()
|
| + new_issue.project_id = dest_project.project_id
|
| + new_issue.project_name = dest_project.project_name
|
| + new_issue.summary = target_issue.summary
|
| + new_issue.labels.extend(target_issue.labels)
|
| + new_issue.field_values.extend(target_issue.field_values)
|
| + new_issue.reporter_id = copier_id
|
| +
|
| + timestamp = int(time.time())
|
| + new_issue.opened_timestamp = timestamp
|
| + new_issue.modified_timestamp = timestamp
|
| +
|
| + target_comments = self.GetCommentsForIssue(cnxn, target_issue.issue_id)
|
| + initial_summary_comment = target_comments[0]
|
| +
|
| + # Note that blocking and merge_into are not copied.
|
| + if target_issue.blocked_on_iids:
|
| + blocked_on = target_issue.blocked_on_iids
|
| + iids_to_invalidate.update(blocked_on)
|
| + new_issue.blocked_on_iids = blocked_on
|
| +
|
| + # Gather list of attachments from the target issue's summary comment.
|
| + # MakeIssueComments expects a list of [(filename, contents, mimetype),...]
|
| + attachments = []
|
| + for attachment in initial_summary_comment.attachments:
|
| + object_path = ('/' + app_identity.get_default_gcs_bucket_name() +
|
| + attachment.gcs_object_id)
|
| + with cloudstorage.open(object_path, 'r') as f:
|
| + content = f.read()
|
| + attachments.append(
|
| + [attachment.filename, content, attachment.mimetype])
|
| +
|
| + if attachments:
|
| + new_issue.attachment_count = len(attachments)
|
| +
|
| + # Create the same summary comment as the target issue.
|
| + comment = self._MakeIssueComment(
|
| + dest_project.project_id, copier_id, initial_summary_comment.content,
|
| + attachments=attachments, timestamp=timestamp, was_escaped=True)
|
| +
|
| + new_issue.local_id = self.AllocateNextLocalID(
|
| + cnxn, dest_project.project_id)
|
| + issue_id = self.InsertIssue(cnxn, new_issue)
|
| + comment.issue_id = issue_id
|
| + self.InsertComment(cnxn, comment)
|
| +
|
| + if permissions.HasRestrictions(new_issue, 'view'):
|
| + self._config_service.InvalidateMemcache(
|
| + [new_issue], key_prefix='nonviewable:')
|
| +
|
| + tracker_fulltext.IndexIssues(
|
| + cnxn, [new_issue], user_service, self, self._config_service)
|
| + created_issues.append(new_issue)
|
| +
|
| + # The referenced issues are all modified when the relationship is added.
|
| + self._UpdateIssuesModified(
|
| + cnxn, iids_to_invalidate, modified_timestamp=timestamp)
|
| +
|
| + return created_issues
|
| +
|
| + def MoveIssues(self, cnxn, dest_project, issues, user_service):
|
| + """Move the given issues into the destination project."""
|
| + old_location_rows = [
|
| + (issue.issue_id, issue.project_id, issue.local_id)
|
| + for issue in issues]
|
| + moved_back_iids = set()
|
| +
|
| + former_locations_in_project = self.issueformerlocations_tbl.Select(
|
| + cnxn, cols=ISSUEFORMERLOCATIONS_COLS,
|
| + project_id=dest_project.project_id,
|
| + issue_id=[issue.issue_id for issue in issues])
|
| + former_locations = {
|
| + issue_id: local_id
|
| + for issue_id, project_id, local_id in former_locations_in_project}
|
| +
|
| + # Remove the issue id from issue_id_2lc so that it does not stay
|
| + # around in cache and memcache.
|
| + # The Key of IssueIDTwoLevelCache is (project_id, local_id).
|
| + issue_id_2lc_key = (issues[0].project_id, issues[0].local_id)
|
| + self.issue_id_2lc.InvalidateKeys(cnxn, [issue_id_2lc_key])
|
| +
|
| + for issue in issues:
|
| + if issue.issue_id in former_locations:
|
| + dest_id = former_locations[issue.issue_id]
|
| + moved_back_iids.add(issue.issue_id)
|
| + else:
|
| + dest_id = self.AllocateNextLocalID(cnxn, dest_project.project_id)
|
| +
|
| + issue.local_id = dest_id
|
| + issue.project_id = dest_project.project_id
|
| + issue.project_name = dest_project.project_name
|
| +
|
| + # Rewrite each whole issue so that status and label IDs are looked up
|
| + # in the context of the destination project.
|
| + self.UpdateIssues(cnxn, issues)
|
| +
|
| + # Comments also have the project_id because it is needed for an index.
|
| + self.comment_tbl.Update(
|
| + cnxn, {'project_id': dest_project.project_id},
|
| + issue_id=[issue.issue_id for issue in issues], commit=False)
|
| +
|
| + # Record old locations so that we can offer links if the user looks there.
|
| + self.issueformerlocations_tbl.InsertRows(
|
| + cnxn, ISSUEFORMERLOCATIONS_COLS, old_location_rows, ignore=True,
|
| + commit=False)
|
| + cnxn.Commit()
|
| +
|
| + tracker_fulltext.IndexIssues(
|
| + cnxn, issues, user_service, self, self._config_service)
|
| +
|
| + return moved_back_iids
|
| +
|
| + def ExpungeFormerLocations(self, cnxn, project_id):
|
| + """Delete history of issues that were in this project but moved out."""
|
| + self.issueformerlocations_tbl.Delete(cnxn, project_id=project_id)
|
| +
|
| + def ExpungeIssues(self, cnxn, issue_ids):
|
| + """Completely delete the specified issues from the database."""
|
| + logging.info('expunging the issues %r', issue_ids)
|
| + tracker_fulltext.UnindexIssues(issue_ids)
|
| +
|
| + remaining_iids = issue_ids[:]
|
| +
|
| + # Note: these are purposely not done in a transaction to allow
|
| + # incremental progress in what might be a very large change.
|
| + # We are not concerned about non-atomic deletes because all
|
| + # this data will be gone eventually anyway.
|
| + while remaining_iids:
|
| + iids_in_chunk = remaining_iids[:CHUNK_SIZE]
|
| + remaining_iids = remaining_iids[CHUNK_SIZE:]
|
| + self.issuesummary_tbl.Delete(cnxn, issue_id=iids_in_chunk)
|
| + self.issue2label_tbl.Delete(cnxn, issue_id=iids_in_chunk)
|
| + self.issue2component_tbl.Delete(cnxn, issue_id=iids_in_chunk)
|
| + self.issue2cc_tbl.Delete(cnxn, issue_id=iids_in_chunk)
|
| + self.issue2notify_tbl.Delete(cnxn, issue_id=iids_in_chunk)
|
| + self.issueupdate_tbl.Delete(cnxn, issue_id=iids_in_chunk)
|
| + self.attachment_tbl.Delete(cnxn, issue_id=iids_in_chunk)
|
| + self.comment_tbl.Delete(cnxn, issue_id=iids_in_chunk)
|
| + self.issuerelation_tbl.Delete(cnxn, issue_id=iids_in_chunk)
|
| + self.issuerelation_tbl.Delete(cnxn, dst_issue_id=iids_in_chunk)
|
| + self.danglingrelation_tbl.Delete(cnxn, issue_id=iids_in_chunk)
|
| + self.issueformerlocations_tbl.Delete(cnxn, issue_id=iids_in_chunk)
|
| + self.reindexqueue_tbl.Delete(cnxn, issue_id=iids_in_chunk)
|
| + self.issue_tbl.Delete(cnxn, id=iids_in_chunk)
|
| +
|
| + def SoftDeleteIssue(self, cnxn, project_id, local_id, deleted, user_service):
|
| + """Set the deleted boolean on the indicated issue and store it.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + project_id: int project ID for the current project.
|
| + local_id: int local ID of the issue to freeze/unfreeze.
|
| + deleted: boolean, True to soft-delete, False to undelete.
|
| + user_service: persistence layer for users, used to lookup user IDs.
|
| + """
|
| + issue = self.GetIssueByLocalID(cnxn, project_id, local_id)
|
| + issue.deleted = deleted
|
| + self.UpdateIssue(cnxn, issue, update_cols=['deleted'])
|
| + tracker_fulltext.IndexIssues(
|
| + cnxn, [issue], user_service, self, self._config_service)
|
| +
|
| + def DeleteComponentReferences(self, cnxn, component_id):
|
| + """Delete any references to the specified component."""
|
| + # TODO(jrobbins): add tasks to re-index any affected issues.
|
| + # Note: if this call fails, some data could be left
|
| + # behind, but it would not be displayed, and it could always be
|
| + # GC'd from the DB later.
|
| + self.issue2component_tbl.Delete(cnxn, component_id=component_id)
|
| +
|
| + ### Local ID generation
|
| +
|
| + def InitializeLocalID(self, cnxn, project_id):
|
| + """Initialize the local ID counter for the specified project to zero.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + project_id: int ID of the project.
|
| + """
|
| + self.localidcounter_tbl.InsertRow(
|
| + cnxn, project_id=project_id, used_local_id=0, used_spam_id=0)
|
| +
|
| + def SetUsedLocalID(self, cnxn, project_id):
|
| + """Set the local ID counter based on existing issues.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + project_id: int ID of the project.
|
| + """
|
| + highest_id = self.GetHighestLocalID(cnxn, project_id)
|
| + self.localidcounter_tbl.Update(
|
| + cnxn, {'used_local_id': highest_id}, project_id=project_id)
|
| + return highest_id
|
| +
|
| + def AllocateNextLocalID(self, cnxn, project_id):
|
| + """Return the next available issue ID in the specified project.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + project_id: int ID of the project.
|
| +
|
| + Returns:
|
| + The next local ID.
|
| + """
|
| + try:
|
| + next_local_id = self.localidcounter_tbl.IncrementCounterValue(
|
| + cnxn, 'used_local_id', project_id=project_id)
|
| + except AssertionError:
|
| + next_local_id = self.SetUsedLocalID(cnxn, project_id) + 1
|
| + return next_local_id
|
| +
|
| + def SetUsedSpamID(self, cnxn, project_id):
|
| + """Set the local ID counter based on existing issues.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + project_id: int ID of the project.
|
| + """
|
| + current_id = self.localidcounter_tbl.SelectValue(
|
| + cnxn, 'used_spam_id', project_id=project_id)
|
| + current_id = current_id or 0 # Will be None if project has no issues.
|
| +
|
| + self.localidcounter_tbl.Update(
|
| + cnxn, {'used_spam_id': current_id + 1}, project_id=project_id)
|
| + return current_id + 1
|
| +
|
| + def AllocateNextSpamLocalID(self, cnxn, project_id):
|
| + """Return the next available spam issue ID in the specified project.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + project_id: int ID of the project.
|
| +
|
| + Returns:
|
| + The next local ID.
|
| + """
|
| + try:
|
| + next_spam_id = self.localidcounter_tbl.IncrementCounterValue(
|
| + cnxn, 'used_spam_id', project_id=project_id)
|
| + except AssertionError:
|
| + next_spam_id = self.SetUsedSpamID(cnxn, project_id) + 1
|
| + return -next_spam_id
|
| +
|
| + def GetHighestLocalID(self, cnxn, project_id):
|
| + """Return the highest used issue ID in the specified project.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + project_id: int ID of the project.
|
| +
|
| + Returns:
|
| + The highest local ID for an active or moved issues.
|
| + """
|
| + highest = self.issue_tbl.SelectValue(
|
| + cnxn, 'MAX(local_id)', project_id=project_id)
|
| + highest = highest or 0 # It will be None if the project has no issues.
|
| + highest_former = self.issueformerlocations_tbl.SelectValue(
|
| + cnxn, 'MAX(local_id)', project_id=project_id)
|
| + highest_former = highest_former or 0
|
| + return max(highest, highest_former)
|
| +
|
| + def GetAllLocalIDsInProject(self, cnxn, project_id, min_local_id=None):
|
| + """Return the list of local IDs only, not the actual issues.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + project_id: the ID of the project to which the issue belongs.
|
| + min_local_id: point to start at.
|
| +
|
| + Returns:
|
| + A range object of local IDs from 1 to N, or from min_local_id to N. It
|
| + may be the case that some of those local IDs are no longer used, e.g.,
|
| + if some issues were moved out of this project.
|
| + """
|
| + if not min_local_id:
|
| + min_local_id = 1
|
| + highest_local_id = self.GetHighestLocalID(cnxn, project_id)
|
| + return range(min_local_id, highest_local_id + 1)
|
| +
|
| + def ExpungeLocalIDCounters(self, cnxn, project_id):
|
| + """Delete history of local ids that were in this project."""
|
| + self.localidcounter_tbl.Delete(cnxn, project_id=project_id)
|
| +
|
| + ### Comments
|
| +
|
| + def _UnpackComment(self, comment_row):
|
| + """Partially construct a Comment PB from a DB row."""
|
| + (comment_id, issue_id, created, project_id, commenter_id, content,
|
| + inbound_message, was_escaped, deleted_by, is_spam) = comment_row
|
| + comment = tracker_pb2.IssueComment()
|
| + comment.id = comment_id
|
| + comment.issue_id = issue_id
|
| + comment.timestamp = created
|
| + comment.project_id = project_id
|
| + comment.user_id = commenter_id
|
| + comment.content = content or ''
|
| + comment.inbound_message = inbound_message or ''
|
| + comment.was_escaped = bool(was_escaped)
|
| + comment.deleted_by = deleted_by or 0
|
| + comment.is_spam = bool(is_spam)
|
| + return comment
|
| +
|
| + def _UnpackAmendment(self, amendment_row):
|
| + """Construct an Amendment PB from a DB row."""
|
| + (_id, _issue_id, comment_id, field_name,
|
| + old_value, new_value, added_user_id, removed_user_id,
|
| + custom_field_name) = amendment_row
|
| + amendment = tracker_pb2.Amendment()
|
| + field_enum = tracker_pb2.FieldID(field_name.upper())
|
| + amendment.field = field_enum
|
| +
|
| + # TODO(jrobbins): display old values in more cases.
|
| + if new_value is not None:
|
| + amendment.newvalue = new_value
|
| + if old_value is not None:
|
| + amendment.oldvalue = old_value
|
| + if added_user_id:
|
| + amendment.added_user_ids.append(added_user_id)
|
| + if removed_user_id:
|
| + amendment.removed_user_ids.append(removed_user_id)
|
| + if custom_field_name:
|
| + amendment.custom_field_name = custom_field_name
|
| + return amendment, comment_id
|
| +
|
| + def _ConsolidateAmendments(self, amendments):
|
| + """Consoliodate amendments of the same field in one comment into one
|
| + amendment PB."""
|
| +
|
| + fields_dict = {}
|
| + result = []
|
| +
|
| + for amendment in amendments:
|
| + fields_dict.setdefault(amendment.field, []).append(amendment)
|
| + for field, amendments in fields_dict.iteritems():
|
| + new_amendment = tracker_pb2.Amendment()
|
| + new_amendment.field = field
|
| + for amendment in amendments:
|
| + if amendment.newvalue is not None:
|
| + new_amendment.newvalue = amendment.newvalue
|
| + if amendment.oldvalue is not None:
|
| + new_amendment.oldvalue = amendment.oldvalue
|
| + if amendment.added_user_ids:
|
| + new_amendment.added_user_ids.extend(amendment.added_user_ids)
|
| + if amendment.removed_user_ids:
|
| + new_amendment.removed_user_ids.extend(amendment.removed_user_ids)
|
| + if amendment.custom_field_name:
|
| + new_amendment.custom_field_name = amendment.custom_field_name
|
| + result.append(new_amendment)
|
| + return result
|
| +
|
| + def _UnpackAttachment(self, attachment_row):
|
| + """Construct an Attachment PB from a DB row."""
|
| + (attachment_id, _issue_id, comment_id, filename, filesize, mimetype,
|
| + deleted, gcs_object_id) = attachment_row
|
| + attach = tracker_pb2.Attachment()
|
| + attach.attachment_id = attachment_id
|
| + attach.filename = filename
|
| + attach.filesize = filesize
|
| + attach.mimetype = mimetype
|
| + attach.deleted = bool(deleted)
|
| + attach.gcs_object_id = gcs_object_id
|
| + return attach, comment_id
|
| +
|
| + def _DeserializeComments(
|
| + self, comment_rows, amendment_rows, attachment_rows):
|
| + """Turn rows into IssueComment PBs."""
|
| + results = [] # keep objects in the same order as the rows
|
| + results_dict = {} # for fast access when joining.
|
| +
|
| + for comment_row in comment_rows:
|
| + comment = self._UnpackComment(comment_row)
|
| + results.append(comment)
|
| + results_dict[comment.id] = comment
|
| +
|
| + for amendment_row in amendment_rows:
|
| + amendment, comment_id = self._UnpackAmendment(amendment_row)
|
| + try:
|
| + results_dict[comment_id].amendments.extend([amendment])
|
| + except KeyError:
|
| + logging.error('Found amendment for missing comment: %r', comment_id)
|
| +
|
| + for attachment_row in attachment_rows:
|
| + attach, comment_id = self._UnpackAttachment(attachment_row)
|
| + try:
|
| + results_dict[comment_id].attachments.append(attach)
|
| + except KeyError:
|
| + logging.error('Found attachment for missing comment: %r', comment_id)
|
| +
|
| + for c in results:
|
| + c.amendments = self._ConsolidateAmendments(c.amendments)
|
| +
|
| + return results
|
| +
|
| + # TODO(jrobbins): make this a private method and expose just the interface
|
| + # needed by activities.py.
|
| + def GetComments(self, cnxn, where=None, order_by=None, **kwargs):
|
| + """Retrieve comments from SQL."""
|
| + # Explicitly specify column Comment.id to allow joins on other tables that
|
| + # have an id column.
|
| + order_by = order_by or [('created', [])]
|
| + comment_rows = self.comment_tbl.Select(
|
| + cnxn, cols=COMMENT_COLS, where=where,
|
| + order_by=order_by, **kwargs)
|
| + cids = [row[0] for row in comment_rows]
|
| + amendment_rows = self.issueupdate_tbl.Select(
|
| + cnxn, cols=ISSUEUPDATE_COLS, comment_id=cids)
|
| + attachment_rows = self.attachment_tbl.Select(
|
| + cnxn, cols=ATTACHMENT_COLS, comment_id=cids)
|
| +
|
| + comments = self._DeserializeComments(
|
| + comment_rows, amendment_rows, attachment_rows)
|
| + return comments
|
| +
|
| + def GetComment(self, cnxn, comment_id):
|
| + """Get the requested comment, or raise an exception."""
|
| + comments = self.GetComments(cnxn, id=comment_id)
|
| + try:
|
| + return comments[0]
|
| + except IndexError:
|
| + raise NoSuchCommentException()
|
| +
|
| + def GetCommentsForIssue(self, cnxn, issue_id):
|
| + """Return all IssueComment PBs for the specified issue.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + issue_id: int global ID of the issue.
|
| +
|
| + Returns:
|
| + A list of the IssueComment protocol buffers for the description
|
| + and comments on this issue.
|
| + """
|
| + comments = self.GetComments(cnxn, issue_id=[issue_id])
|
| + for i, comment in enumerate(comments):
|
| + comment.sequence = i
|
| +
|
| + return comments
|
| +
|
| + def GetCommentsByID(self, cnxn, comment_ids, sequences):
|
| + """Return all IssueComment PBs by comment ids.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + comment_ids: a list of comment ids.
|
| + sequences: sequence of the comments.
|
| +
|
| + Returns:
|
| + A list of the IssueComment protocol buffers for the description
|
| + and comments on this issue.
|
| + """
|
| + order_by = [('created ASC', [])]
|
| + comment_rows = self.comment_tbl.Select(
|
| + cnxn, cols=COMMENT_COLS, order_by=order_by, id=comment_ids)
|
| + amendment_rows = self.issueupdate_tbl.Select(
|
| + cnxn, cols=ISSUEUPDATE_COLS, comment_id=comment_ids)
|
| + attachment_rows = self.attachment_tbl.Select(
|
| + cnxn, cols=ATTACHMENT_COLS, comment_id=comment_ids)
|
| +
|
| + comments = self._DeserializeComments(
|
| + comment_rows, amendment_rows, attachment_rows)
|
| +
|
| + for i in xrange(len(comment_ids)):
|
| + comments[i].sequence = sequences[i]
|
| +
|
| + return comments
|
| +
|
| + def GetAbbrCommentsForIssue(self, cnxn, issue_id):
|
| + """Get all abbreviated comments for the specified issue."""
|
| + order_by = [('created ASC', [])]
|
| + comment_rows = self.comment_tbl.Select(
|
| + cnxn, cols=ABBR_COMMENT_COLS, issue_id=[issue_id], order_by=order_by)
|
| +
|
| + return comment_rows
|
| +
|
| + # TODO(jrobbins): remove this message because it is too slow when an issue
|
| + # has a huge number of comments.
|
| + def GetCommentsForIssues(self, cnxn, issue_ids):
|
| + """Return all IssueComment PBs for each issue ID in the given list.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + issue_ids: list of integer global issue IDs.
|
| +
|
| + Returns:
|
| + Dict {issue_id: [IssueComment, ...]} with IssueComment protocol
|
| + buffers for the description and comments on each issue.
|
| + """
|
| + comments = self.GetComments(cnxn, issue_id=issue_ids)
|
| +
|
| + comments_dict = collections.defaultdict(list)
|
| + for comment in comments:
|
| + comment.sequence = len(comments_dict[comment.issue_id])
|
| + comments_dict[comment.issue_id].append(comment)
|
| +
|
| + return comments_dict
|
| +
|
| + def InsertComment(self, cnxn, comment, commit=True):
|
| + """Store the given issue comment in SQL.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + comment: IssueComment PB to insert into the database.
|
| + commit: set to False to avoid doing the commit for now.
|
| + """
|
| + comment_id = self.comment_tbl.InsertRow(
|
| + cnxn, issue_id=comment.issue_id, created=comment.timestamp,
|
| + project_id=comment.project_id,
|
| + commenter_id=comment.user_id, content=comment.content,
|
| + inbound_message=comment.inbound_message,
|
| + was_escaped=comment.was_escaped,
|
| + deleted_by=comment.deleted_by or None,
|
| + is_spam=comment.is_spam,
|
| + commit=commit)
|
| + comment.id = comment_id
|
| +
|
| + amendment_rows = []
|
| + for amendment in comment.amendments:
|
| + field_enum = str(amendment.field).lower()
|
| + if (amendment.get_assigned_value('newvalue') is not None and
|
| + not amendment.added_user_ids and not amendment.removed_user_ids):
|
| + amendment_rows.append((
|
| + comment.issue_id, comment_id, field_enum,
|
| + amendment.oldvalue, amendment.newvalue,
|
| + None, None, amendment.custom_field_name))
|
| + for added_user_id in amendment.added_user_ids:
|
| + amendment_rows.append((
|
| + comment.issue_id, comment_id, field_enum, None, None,
|
| + added_user_id, None, amendment.custom_field_name))
|
| + for removed_user_id in amendment.removed_user_ids:
|
| + amendment_rows.append((
|
| + comment.issue_id, comment_id, field_enum, None, None,
|
| + None, removed_user_id, amendment.custom_field_name))
|
| + # ISSUEUPDATE_COLS[1:] to skip id column.
|
| + self.issueupdate_tbl.InsertRows(
|
| + cnxn, ISSUEUPDATE_COLS[1:], amendment_rows, commit=commit)
|
| +
|
| + attachment_rows = []
|
| + for attach in comment.attachments:
|
| + attachment_rows.append([
|
| + comment.issue_id, comment.id, attach.filename, attach.filesize,
|
| + attach.mimetype, attach.deleted, attach.gcs_object_id])
|
| + self.attachment_tbl.InsertRows(
|
| + cnxn, ATTACHMENT_COLS[1:], attachment_rows, commit=commit)
|
| +
|
| + def _UpdateComment(self, cnxn, comment, update_cols=None):
|
| + """Update the given issue comment in SQL.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + comment: IssueComment PB to update in the database.
|
| + update_cols: optional list of just the field names to update.
|
| + """
|
| + delta = {
|
| + 'commenter_id': comment.user_id,
|
| + 'content': comment.content,
|
| + 'deleted_by': comment.deleted_by or None,
|
| + 'is_spam': comment.is_spam,
|
| + }
|
| + if update_cols is not None:
|
| + delta = {key: val for key, val in delta.iteritems()
|
| + if key in update_cols}
|
| +
|
| + self.comment_tbl.Update(cnxn, delta, id=comment.id)
|
| +
|
| + def _MakeIssueComment(
|
| + self, project_id, user_id, content, inbound_message=None,
|
| + amendments=None, attachments=None, timestamp=None, was_escaped=False,
|
| + is_spam=False):
|
| + """Create in IssueComment protocol buffer in RAM.
|
| +
|
| + Args:
|
| + project_id: Project with the issue.
|
| + user_id: the user ID of the user who entered the comment.
|
| + content: string body of the comment.
|
| + inbound_message: optional string full text of an email that
|
| + caused this comment to be added.
|
| + amendments: list of Amendment PBs describing the
|
| + metadata changes that the user made along w/ comment.
|
| + attachments: [(filename, contents, mimetype),...] attachments uploaded at
|
| + the time the comment was made.
|
| + timestamp: time at which the comment was made, defaults to now.
|
| + was_escaped: True if the comment was HTML escaped already.
|
| + is_spam: True if the comment was classified as spam.
|
| + Returns:
|
| + The new IssueComment protocol buffer.
|
| +
|
| + The content may have some markup done during input processing.
|
| +
|
| + Any attachments are immediately stored.
|
| + """
|
| + comment = tracker_pb2.IssueComment()
|
| + comment.project_id = project_id
|
| + comment.user_id = user_id
|
| + comment.content = content or ''
|
| + comment.was_escaped = was_escaped
|
| + comment.is_spam = is_spam
|
| + if not timestamp:
|
| + timestamp = int(time.time())
|
| + comment.timestamp = int(timestamp)
|
| + if inbound_message:
|
| + comment.inbound_message = inbound_message
|
| + if amendments:
|
| + logging.info('amendments is %r', amendments)
|
| + comment.amendments.extend(amendments)
|
| +
|
| + if attachments:
|
| + for filename, body, mimetype in attachments:
|
| + gcs_object_id = gcs_helpers.StoreObjectInGCS(body, mimetype, project_id)
|
| + attach = tracker_pb2.Attachment()
|
| + # attachment id is determined later by the SQL DB.
|
| + attach.filename = filename
|
| + attach.filesize = len(body)
|
| + attach.mimetype = mimetype
|
| + attach.gcs_object_id = gcs_object_id
|
| + comment.attachments.extend([attach])
|
| + logging.info("Save attachment with object_id: %s" % gcs_object_id)
|
| +
|
| + return comment
|
| +
|
| + def CreateIssueComment(
|
| + self, cnxn, project_id, local_id, user_id, content, inbound_message=None,
|
| + amendments=None, attachments=None, timestamp=None, is_spam=False,
|
| + commit=True):
|
| + """Create and store a new comment on the specified issue.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + project_id: int ID of the current Project.
|
| + local_id: the issue on which to add the comment.
|
| + user_id: the user ID of the user who entered the comment.
|
| + content: string body of the comment.
|
| + inbound_message: optional string full text of an email that caused
|
| + this comment to be added.
|
| + amendments: list of Amendment PBs describing the
|
| + metadata changes that the user made along w/ comment.
|
| + attachments: [(filename, contents, mimetype),...] attachments uploaded at
|
| + the time the comment was made.
|
| + timestamp: time at which the comment was made, defaults to now.
|
| + is_spam: True if the comment is classified as spam.
|
| + commit: set to False to not commit to DB yet.
|
| +
|
| + Returns:
|
| + The new IssueComment protocol buffer.
|
| +
|
| + Note that we assume that the content is safe to echo out
|
| + again. The content may have some markup done during input
|
| + processing.
|
| + """
|
| + issue = self.GetIssueByLocalID(cnxn, project_id, local_id)
|
| +
|
| + comment = self._MakeIssueComment(
|
| + issue.project_id, user_id, content, amendments=amendments,
|
| + inbound_message=inbound_message, attachments=attachments,
|
| + timestamp=timestamp, is_spam=is_spam)
|
| + comment.issue_id = issue.issue_id
|
| +
|
| + if attachments:
|
| + issue.attachment_count = issue.attachment_count + len(attachments)
|
| + self.UpdateIssue(cnxn, issue, update_cols=['attachment_count'])
|
| +
|
| + self.InsertComment(cnxn, comment, commit=commit)
|
| +
|
| + return comment
|
| +
|
| + def SoftDeleteComment(
|
| + self, cnxn, project_id, local_id, sequence_num, deleted_by_user_id,
|
| + user_service, delete=True, reindex=True, is_spam=False):
|
| + """Mark comment as un/deleted, which shows/hides it from average users."""
|
| + issue = self.GetIssueByLocalID(cnxn, project_id, local_id)
|
| +
|
| + all_comments = self.GetCommentsForIssue(cnxn, issue.issue_id)
|
| + try:
|
| + issue_comment = all_comments[sequence_num]
|
| + except IndexError:
|
| + logging.warning(
|
| + 'Tried to (un)delete non-existent comment #%s in issue %s:%s',
|
| + sequence_num, project_id, local_id)
|
| + return
|
| +
|
| + # Update number of attachments
|
| + attachments = 0
|
| + if issue_comment.attachments:
|
| + for attachment in issue_comment.attachments:
|
| + if not attachment.deleted:
|
| + attachments += 1
|
| +
|
| + # Delete only if it's not in deleted state
|
| + if delete:
|
| + if not issue_comment.deleted_by:
|
| + issue_comment.deleted_by = deleted_by_user_id
|
| + issue.attachment_count = issue.attachment_count - attachments
|
| +
|
| + # Undelete only if it's in deleted state
|
| + elif issue_comment.deleted_by:
|
| + issue_comment.deleted_by = 0
|
| + issue.attachment_count = issue.attachment_count + attachments
|
| +
|
| + issue_comment.is_spam = is_spam
|
| + self._UpdateComment(
|
| + cnxn, issue_comment, update_cols=['deleted_by', 'is_spam'])
|
| + self.UpdateIssue(cnxn, issue, update_cols=['attachment_count'])
|
| +
|
| + # Reindex the issue to take the comment deletion/undeletion into account.
|
| + if reindex:
|
| + tracker_fulltext.IndexIssues(
|
| + cnxn, [issue], user_service, self, self._config_service)
|
| +
|
| + ### Attachments
|
| +
|
| + def GetAttachmentAndContext(self, cnxn, attachment_id):
|
| + """Load a IssueAttachment from database, and its comment ID and IID.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + attachment_id: long integer unique ID of desired issue attachment.
|
| +
|
| + Returns:
|
| + An Attachment protocol buffer that contains metadata about the attached
|
| + file, or None if it doesn't exist. Also, the comment ID and issue IID
|
| + of the comment and issue that contain this attachment.
|
| +
|
| + Raises:
|
| + NoSuchAttachmentException: the attachment was not found.
|
| + """
|
| + if attachment_id is None:
|
| + raise NoSuchAttachmentException()
|
| +
|
| + attachment_row = self.attachment_tbl.SelectRow(
|
| + cnxn, cols=ATTACHMENT_COLS, id=attachment_id)
|
| + if attachment_row:
|
| + (attach_id, issue_id, comment_id, filename, filesize, mimetype,
|
| + deleted, gcs_object_id) = attachment_row
|
| + if not deleted:
|
| + attachment = tracker_pb2.Attachment(
|
| + attachment_id=attach_id, filename=filename, filesize=filesize,
|
| + mimetype=mimetype, deleted=bool(deleted),
|
| + gcs_object_id=gcs_object_id)
|
| + return attachment, comment_id, issue_id
|
| +
|
| + raise NoSuchAttachmentException()
|
| +
|
| + def _UpdateAttachment(self, cnxn, attach, update_cols=None):
|
| + """Update attachment metadata in the DB.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + attach: IssueAttachment PB to update in the DB.
|
| + update_cols: optional list of just the field names to update.
|
| + """
|
| + delta = {
|
| + 'filename': attach.filename,
|
| + 'filesize': attach.filesize,
|
| + 'mimetype': attach.mimetype,
|
| + 'deleted': bool(attach.deleted),
|
| + }
|
| + if update_cols is not None:
|
| + delta = {key: val for key, val in delta.iteritems()
|
| + if key in update_cols}
|
| +
|
| + self.attachment_tbl.Update(cnxn, delta, id=attach.attachment_id)
|
| +
|
| + def SoftDeleteAttachment(
|
| + self, cnxn, project_id, local_id, seq_num, attach_id, user_service,
|
| + delete=True, index_now=True):
|
| + """Mark attachment as un/deleted, which shows/hides it from avg users."""
|
| + issue = self.GetIssueByLocalID(cnxn, project_id, local_id)
|
| + all_comments = self.GetCommentsForIssue(cnxn, issue.issue_id)
|
| + try:
|
| + issue_comment = all_comments[seq_num]
|
| + except IndexError:
|
| + logging.warning(
|
| + 'Tried to (un)delete attachment on non-existent comment #%s in '
|
| + 'issue %s:%s', seq_num, project_id, local_id)
|
| + return
|
| +
|
| + attachment = None
|
| + for attach in issue_comment.attachments:
|
| + if attach.attachment_id == attach_id:
|
| + attachment = attach
|
| +
|
| + if not attachment:
|
| + logging.warning(
|
| + 'Tried to (un)delete non-existent attachment #%s in project '
|
| + '%s issue %s', attach_id, project_id, local_id)
|
| + return
|
| +
|
| + if not issue_comment.deleted_by:
|
| + # Decrement attachment count only if it's not in deleted state
|
| + if delete:
|
| + if not attachment.deleted:
|
| + issue.attachment_count = issue.attachment_count - 1
|
| +
|
| + # Increment attachment count only if it's in deleted state
|
| + elif attachment.deleted:
|
| + issue.attachment_count = issue.attachment_count + 1
|
| +
|
| + attachment.deleted = delete
|
| +
|
| + self._UpdateAttachment(cnxn, attachment, update_cols=['deleted'])
|
| + self.UpdateIssue(cnxn, issue, update_cols=['attachment_count'])
|
| +
|
| + if index_now:
|
| + tracker_fulltext.IndexIssues(
|
| + cnxn, [issue], user_service, self, self._config_service)
|
| +
|
| + ### Reindex queue
|
| +
|
| + def EnqueueIssuesForIndexing(self, cnxn, issue_ids):
|
| + """Add the given issue IDs to the ReindexQueue table."""
|
| + reindex_rows = [(issue_id,) for issue_id in issue_ids]
|
| + self.reindexqueue_tbl.InsertRows(
|
| + cnxn, ['issue_id'], reindex_rows, ignore=True)
|
| +
|
| + def ReindexIssues(self, cnxn, num_to_reindex, user_service):
|
| + """Reindex some issues specified in the IndexQueue table."""
|
| + rows = self.reindexqueue_tbl.Select(
|
| + cnxn, order_by=[('created', [])], limit=num_to_reindex)
|
| + issue_ids = [row[0] for row in rows]
|
| +
|
| + if issue_ids:
|
| + issues = self.GetIssues(cnxn, issue_ids)
|
| + tracker_fulltext.IndexIssues(
|
| + cnxn, issues, user_service, self, self._config_service)
|
| + self.reindexqueue_tbl.Delete(cnxn, issue_id=issue_ids)
|
| +
|
| + return len(issue_ids)
|
| +
|
| + ### Search functions
|
| +
|
| + def RunIssueQuery(
|
| + self, cnxn, left_joins, where, order_by, shard_id=None, limit=None):
|
| + """Run a SQL query to find matching issue IDs.
|
| +
|
| + Args:
|
| + cnxn: connection to SQL database.
|
| + left_joins: list of SQL LEFT JOIN clauses.
|
| + where: list of SQL WHERE clauses.
|
| + order_by: list of SQL ORDER BY clauses.
|
| + shard_id: int shard ID to focus the search.
|
| + limit: int maximum number of results, defaults to
|
| + settings.search_limit_per_shard.
|
| +
|
| + Returns:
|
| + (issue_ids, capped) where issue_ids is a list of the result issue IDs,
|
| + and capped is True if the number of results reached the limit.
|
| + """
|
| + limit = limit or settings.search_limit_per_shard
|
| + where = where + [('Issue.deleted = %s', [False])]
|
| + rows = self.issue_tbl.Select(
|
| + cnxn, shard_id=shard_id, distinct=True, cols=['Issue.id'],
|
| + left_joins=left_joins, where=where, order_by=order_by,
|
| + limit=limit)
|
| + issue_ids = [row[0] for row in rows]
|
| + capped = len(issue_ids) >= limit
|
| + return issue_ids, capped
|
| +
|
| + def GetIIDsByLabelIDs(self, cnxn, label_ids, project_id, shard_id):
|
| + """Return a list of IIDs for issues with any of the given label IDs."""
|
| + where = []
|
| + if shard_id is not None:
|
| + slice_term = ('shard = %s', [shard_id])
|
| + where.append(slice_term)
|
| +
|
| + rows = self.issue_tbl.Select(
|
| + cnxn, shard_id=shard_id, cols=['id'],
|
| + left_joins=[('Issue2Label ON Issue.id = Issue2Label.issue_id', [])],
|
| + label_id=label_ids, project_id=project_id, where=where)
|
| +
|
| + return [row[0] for row in rows]
|
| +
|
| + def GetIIDsByParticipant(self, cnxn, user_ids, project_ids, shard_id):
|
| + """Return IIDs for issues where any of the given users participate."""
|
| + iids = []
|
| + where = []
|
| + if shard_id is not None:
|
| + where.append(('shard = %s', [shard_id]))
|
| + if project_ids:
|
| + cond_str = 'Issue.project_id IN (%s)' % sql.PlaceHolders(project_ids)
|
| + where.append((cond_str, project_ids))
|
| +
|
| + # TODO(jrobbins): Combine these 3 queries into one with ORs. It currently
|
| + # is not the bottleneck.
|
| + rows = self.issue_tbl.Select(
|
| + cnxn, cols=['id'], reporter_id=user_ids,
|
| + where=where, shard_id=shard_id)
|
| + for row in rows:
|
| + iids.append(row[0])
|
| +
|
| + rows = self.issue_tbl.Select(
|
| + cnxn, cols=['id'], owner_id=user_ids,
|
| + where=where, shard_id=shard_id)
|
| + for row in rows:
|
| + iids.append(row[0])
|
| +
|
| + rows = self.issue_tbl.Select(
|
| + cnxn, cols=['id'], derived_owner_id=user_ids,
|
| + where=where, shard_id=shard_id)
|
| + for row in rows:
|
| + iids.append(row[0])
|
| +
|
| + rows = self.issue_tbl.Select(
|
| + cnxn, cols=['id'],
|
| + left_joins=[('Issue2Cc ON Issue2Cc.issue_id = Issue.id', [])],
|
| + cc_id=user_ids,
|
| + where=where + [('cc_id IS NOT NULL', [])],
|
| + shard_id=shard_id)
|
| + for row in rows:
|
| + iids.append(row[0])
|
| +
|
| + rows = self.issue_tbl.Select(
|
| + cnxn, cols=['Issue.id'],
|
| + left_joins=[
|
| + ('Issue2FieldValue ON Issue.id = Issue2FieldValue.issue_id', []),
|
| + ('FieldDef ON Issue2FieldValue.field_id = FieldDef.id', [])],
|
| + user_id=user_ids, grants_perm='View',
|
| + where=where + [('user_id IS NOT NULL', [])],
|
| + shard_id=shard_id)
|
| + for row in rows:
|
| + iids.append(row[0])
|
| +
|
| + return iids
|
| +
|
| +
|
| +def _UpdateClosedTimestamp(config, issue, old_effective_status):
|
| + """Sets or unsets the closed_timestamp based based on status changes.
|
| +
|
| + If the status is changing from open to closed, the closed_timestamp is set to
|
| + the current time.
|
| +
|
| + If the status is changing form closed to open, the close_timestamp is unset.
|
| +
|
| + If the status is changing from one closed to another closed, or from one
|
| + open to another open, no operations are performed.
|
| +
|
| + Args:
|
| + config: the project configuration
|
| + issue: the issue being updated (a protocol buffer)
|
| + old_effective_status: the old issue status string. E.g., 'New'
|
| + """
|
| + # open -> closed
|
| + if (tracker_helpers.MeansOpenInProject(old_effective_status, config)
|
| + and not tracker_helpers.MeansOpenInProject(
|
| + tracker_bizobj.GetStatus(issue), config)):
|
| +
|
| + logging.info('setting closed_timestamp on issue: %d', issue.local_id)
|
| +
|
| + issue.closed_timestamp = int(time.time())
|
| + return
|
| +
|
| + # closed -> open
|
| + if (not tracker_helpers.MeansOpenInProject(old_effective_status, config)
|
| + and tracker_helpers.MeansOpenInProject(
|
| + tracker_bizobj.GetStatus(issue), config)):
|
| +
|
| + logging.info('clearing closed_timestamp on issue: %s', issue.local_id)
|
| +
|
| + issue.reset('closed_timestamp')
|
| + return
|
| +
|
| +
|
| +class Error(Exception):
|
| + """Base exception class for this package."""
|
| + pass
|
| +
|
| +
|
| +class NoSuchIssueException(Error):
|
| + """The requested issue was not found."""
|
| + pass
|
| +
|
| +
|
| +class NoSuchAttachmentException(Error):
|
| + """The requested attachment was not found."""
|
| + pass
|
| +
|
| +
|
| +class NoSuchCommentException(Error):
|
| + """The requested comment was not found."""
|
| + pass
|
| +
|
| +
|
| +class MidAirCollisionException(Error):
|
| + """The item was updated by another user at the same time."""
|
| +
|
| + def __init__(self, name, continue_issue_id):
|
| + super(MidAirCollisionException, self).__init__()
|
| + self.name = name # human-readable name for the artifact being edited.
|
| + self.continue_issue_id = continue_issue_id # ID of issue to start over.
|
|
|