Index: appengine/monorail/services/issue_svc.py |
diff --git a/appengine/monorail/services/issue_svc.py b/appengine/monorail/services/issue_svc.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..6a98c459c3472a2978ecb199c8307ddc019bab3e |
--- /dev/null |
+++ b/appengine/monorail/services/issue_svc.py |
@@ -0,0 +1,2557 @@ |
+# Copyright 2016 The Chromium Authors. All rights reserved. |
+# Use of this source code is govered by a BSD-style |
+# license that can be found in the LICENSE file or at |
+# https://developers.google.com/open-source/licenses/bsd |
+ |
+"""A set of functions that provide persistence for Monorail issue tracking. |
+ |
+This module provides functions to get, update, create, and (in some |
+cases) delete each type of business object. It provides a logical |
+persistence layer on top of an SQL database. |
+ |
+Business objects are described in tracker_pb2.py and tracker_bizobj.py. |
+""" |
+ |
+import collections |
+import json |
+import logging |
+import os |
+import time |
+import uuid |
+ |
+from google.appengine.api import app_identity |
+from google.appengine.api import images |
+from third_party import cloudstorage |
+ |
+import settings |
+from features import filterrules_helpers |
+from framework import framework_bizobj |
+from framework import framework_constants |
+from framework import framework_helpers |
+from framework import gcs_helpers |
+from framework import permissions |
+from framework import sql |
+from infra_libs import ts_mon |
+from proto import project_pb2 |
+from proto import tracker_pb2 |
+from services import caches |
+from services import tracker_fulltext |
+from tracker import tracker_bizobj |
+from tracker import tracker_helpers |
+ |
+ |
+ISSUE_TABLE_NAME = 'Issue' |
+ISSUESUMMARY_TABLE_NAME = 'IssueSummary' |
+ISSUE2LABEL_TABLE_NAME = 'Issue2Label' |
+ISSUE2COMPONENT_TABLE_NAME = 'Issue2Component' |
+ISSUE2CC_TABLE_NAME = 'Issue2Cc' |
+ISSUE2NOTIFY_TABLE_NAME = 'Issue2Notify' |
+ISSUE2FIELDVALUE_TABLE_NAME = 'Issue2FieldValue' |
+COMMENT_TABLE_NAME = 'Comment' |
+ATTACHMENT_TABLE_NAME = 'Attachment' |
+ISSUERELATION_TABLE_NAME = 'IssueRelation' |
+DANGLINGRELATION_TABLE_NAME = 'DanglingIssueRelation' |
+ISSUEUPDATE_TABLE_NAME = 'IssueUpdate' |
+ISSUEFORMERLOCATIONS_TABLE_NAME = 'IssueFormerLocations' |
+REINDEXQUEUE_TABLE_NAME = 'ReindexQueue' |
+LOCALIDCOUNTER_TABLE_NAME = 'LocalIDCounter' |
+ |
+ISSUE_COLS = [ |
+ 'id', 'project_id', 'local_id', 'status_id', 'owner_id', 'reporter_id', |
+ 'opened', 'closed', 'modified', 'derived_owner_id', 'derived_status_id', |
+ 'deleted', 'star_count', 'attachment_count', 'is_spam'] |
+ISSUESUMMARY_COLS = ['issue_id', 'summary'] |
+ISSUE2LABEL_COLS = ['issue_id', 'label_id', 'derived'] |
+ISSUE2COMPONENT_COLS = ['issue_id', 'component_id', 'derived'] |
+ISSUE2CC_COLS = ['issue_id', 'cc_id', 'derived'] |
+ISSUE2NOTIFY_COLS = ['issue_id', 'email'] |
+ISSUE2FIELDVALUE_COLS = [ |
+ 'issue_id', 'field_id', 'int_value', 'str_value', 'user_id', 'derived'] |
+COMMENT_COLS = [ |
+ 'Comment.id', 'issue_id', 'created', 'Comment.project_id', 'commenter_id', |
+ 'content', 'inbound_message', 'was_escaped', 'deleted_by', |
+ 'Comment.is_spam'] |
+ABBR_COMMENT_COLS = ['Comment.id', 'commenter_id', 'deleted_by'] |
+ATTACHMENT_COLS = [ |
+ 'id', 'issue_id', 'comment_id', 'filename', 'filesize', 'mimetype', |
+ 'deleted', 'gcs_object_id'] |
+ISSUERELATION_COLS = ['issue_id', 'dst_issue_id', 'kind'] |
+DANGLINGRELATION_COLS = [ |
+ 'issue_id', 'dst_issue_project', 'dst_issue_local_id', 'kind'] |
+ISSUEUPDATE_COLS = [ |
+ 'id', 'issue_id', 'comment_id', 'field', 'old_value', 'new_value', |
+ 'added_user_id', 'removed_user_id', 'custom_field_name'] |
+ISSUEFORMERLOCATIONS_COLS = ['issue_id', 'project_id', 'local_id'] |
+REINDEXQUEUE_COLS = ['issue_id', 'created'] |
+ |
+CHUNK_SIZE = 1000 |
+ |
+ |
+class IssueIDTwoLevelCache(caches.AbstractTwoLevelCache): |
+ """Class to manage RAM and memcache for Issue IDs.""" |
+ |
+ def __init__(self, cache_manager, issue_service): |
+ super(IssueIDTwoLevelCache, self).__init__( |
+ cache_manager, 'issue_id', 'issue_id:', int, |
+ max_size=settings.issue_cache_max_size, use_value_centric_cache=True) |
+ self.issue_service = issue_service |
+ |
+ def _DeserializeIssueIDs(self, project_local_issue_ids): |
+ """Convert database rows into a dict {(project_id, local_id): issue_id}.""" |
+ return {(project_id, local_id): issue_id |
+ for (project_id, local_id, issue_id) in project_local_issue_ids} |
+ |
+ def FetchItems(self, cnxn, keys): |
+ """On RAM and memcache miss, hit the database.""" |
+ local_ids_by_pid = collections.defaultdict(list) |
+ for project_id, local_id in keys: |
+ local_ids_by_pid[project_id].append(local_id) |
+ |
+ where = [] # We OR per-project pairs of conditions together. |
+ for project_id, local_ids_in_project in local_ids_by_pid.iteritems(): |
+ term_str = ('(Issue.project_id = %%s AND Issue.local_id IN (%s))' % |
+ sql.PlaceHolders(local_ids_in_project)) |
+ where.append((term_str, [project_id] + local_ids_in_project)) |
+ |
+ rows = self.issue_service.issue_tbl.Select( |
+ cnxn, cols=['project_id', 'local_id', 'id'], |
+ where=where, or_where_conds=True) |
+ return self._DeserializeIssueIDs(rows) |
+ |
+ def _KeyToStr(self, key): |
+ """This cache uses pairs of ints as keys. Convert them to strings.""" |
+ return '%d,%d' % key |
+ |
+ def _StrToKey(self, key_str): |
+ """This cache uses pairs of ints as keys. Convert them from strings.""" |
+ project_id_str, local_id_str = key_str.split(',') |
+ return int(project_id_str), int(local_id_str) |
+ |
+ |
+class IssueTwoLevelCache(caches.AbstractTwoLevelCache): |
+ """Class to manage RAM and memcache for Issue PBs.""" |
+ |
+ def __init__( |
+ self, cache_manager, issue_service, project_service, config_service): |
+ super(IssueTwoLevelCache, self).__init__( |
+ cache_manager, 'issue', 'issue:', tracker_pb2.Issue, |
+ max_size=settings.issue_cache_max_size) |
+ self.issue_service = issue_service |
+ self.project_service = project_service |
+ self.config_service = config_service |
+ |
+ def _UnpackIssue(self, cnxn, issue_row): |
+ """Partially construct an issue object using info from a DB row.""" |
+ (issue_id, project_id, local_id, status_id, owner_id, reporter_id, |
+ opened, closed, modified, derived_owner_id, derived_status_id, |
+ deleted, star_count, attachment_count, is_spam) = issue_row |
+ |
+ issue = tracker_pb2.Issue() |
+ project = self.project_service.GetProject(cnxn, project_id) |
+ issue.project_name = project.project_name |
+ issue.issue_id = issue_id |
+ issue.project_id = project_id |
+ issue.local_id = local_id |
+ if status_id is not None: |
+ status = self.config_service.LookupStatus(cnxn, project_id, status_id) |
+ issue.status = status |
+ issue.owner_id = owner_id or 0 |
+ issue.reporter_id = reporter_id or 0 |
+ issue.derived_owner_id = derived_owner_id or 0 |
+ if derived_status_id is not None: |
+ derived_status = self.config_service.LookupStatus( |
+ cnxn, project_id, derived_status_id) |
+ issue.derived_status = derived_status |
+ issue.deleted = bool(deleted) |
+ if opened: |
+ issue.opened_timestamp = opened |
+ if closed: |
+ issue.closed_timestamp = closed |
+ if modified: |
+ issue.modified_timestamp = modified |
+ issue.star_count = star_count |
+ issue.attachment_count = attachment_count |
+ issue.is_spam = bool(is_spam) |
+ return issue |
+ |
+ def _UnpackFieldValue(self, fv_row): |
+ """Construct a field value object from a DB row.""" |
+ (issue_id, field_id, int_value, str_value, user_id, derived) = fv_row |
+ fv = tracker_bizobj.MakeFieldValue( |
+ field_id, int_value, str_value, user_id, bool(derived)) |
+ return fv, issue_id |
+ |
+ def _DeserializeIssues( |
+ self, cnxn, issue_rows, summary_rows, label_rows, component_rows, |
+ cc_rows, notify_rows, fieldvalue_rows, relation_rows, |
+ dangling_relation_rows): |
+ """Convert the given DB rows into a dict of Issue PBs.""" |
+ results_dict = {} |
+ for issue_row in issue_rows: |
+ issue = self._UnpackIssue(cnxn, issue_row) |
+ results_dict[issue.issue_id] = issue |
+ |
+ for issue_id, summary in summary_rows: |
+ results_dict[issue_id].summary = summary |
+ |
+ # TODO(jrobbins): it would be nice to order labels by rank and name. |
+ for issue_id, label_id, derived in label_rows: |
+ issue = results_dict.get(issue_id) |
+ if not issue: |
+ logging.info('Got label for an unknown issue: %r %r', |
+ label_rows, issue_rows) |
+ continue |
+ label = self.config_service.LookupLabel(cnxn, issue.project_id, label_id) |
+ assert label, ('Label ID %r on IID %r not found in project %r' % |
+ (label_id, issue_id, issue.project_id)) |
+ if derived: |
+ results_dict[issue_id].derived_labels.append(label) |
+ else: |
+ results_dict[issue_id].labels.append(label) |
+ |
+ for issue_id, component_id, derived in component_rows: |
+ if derived: |
+ results_dict[issue_id].derived_component_ids.append(component_id) |
+ else: |
+ results_dict[issue_id].component_ids.append(component_id) |
+ |
+ for issue_id, user_id, derived in cc_rows: |
+ if derived: |
+ results_dict[issue_id].derived_cc_ids.append(user_id) |
+ else: |
+ results_dict[issue_id].cc_ids.append(user_id) |
+ |
+ for issue_id, email in notify_rows: |
+ results_dict[issue_id].derived_notify_addrs.append(email) |
+ |
+ for fv_row in fieldvalue_rows: |
+ fv, issue_id = self._UnpackFieldValue(fv_row) |
+ results_dict[issue_id].field_values.append(fv) |
+ |
+ for issue_id, dst_issue_id, kind in relation_rows: |
+ src_issue = results_dict.get(issue_id) |
+ dst_issue = results_dict.get(dst_issue_id) |
+ assert src_issue or dst_issue, ( |
+ 'Neither source issue %r nor dest issue %r was found' % |
+ (issue_id, dst_issue_id)) |
+ if src_issue: |
+ if kind == 'blockedon': |
+ src_issue.blocked_on_iids.append(dst_issue_id) |
+ elif kind == 'mergedinto': |
+ src_issue.merged_into = dst_issue_id |
+ else: |
+ logging.info('unknown relation kind %r', kind) |
+ continue |
+ |
+ if dst_issue: |
+ if kind == 'blockedon': |
+ dst_issue.blocking_iids.append(issue_id) |
+ |
+ for issue_id, dst_issue_proj, dst_issue_id, kind in dangling_relation_rows: |
+ src_issue = results_dict.get(issue_id) |
+ if kind == 'blockedon': |
+ src_issue.dangling_blocked_on_refs.append( |
+ tracker_bizobj.MakeDanglingIssueRef(dst_issue_proj, dst_issue_id)) |
+ elif kind == 'blocking': |
+ src_issue.dangling_blocking_refs.append( |
+ tracker_bizobj.MakeDanglingIssueRef(dst_issue_proj, dst_issue_id)) |
+ else: |
+ logging.warn('unhandled danging relation kind %r', kind) |
+ continue |
+ |
+ return results_dict |
+ |
+ # Note: sharding is used to here to allow us to load issues from the replicas |
+ # without placing load on the master. Writes are not sharded. |
+ # pylint: disable=arguments-differ |
+ def FetchItems(self, cnxn, issue_ids, shard_id=None): |
+ """Retrieve and deserialize issues.""" |
+ issue_rows = self.issue_service.issue_tbl.Select( |
+ cnxn, cols=ISSUE_COLS, id=issue_ids, shard_id=shard_id) |
+ |
+ summary_rows = self.issue_service.issuesummary_tbl.Select( |
+ cnxn, cols=ISSUESUMMARY_COLS, shard_id=shard_id, issue_id=issue_ids) |
+ label_rows = self.issue_service.issue2label_tbl.Select( |
+ cnxn, cols=ISSUE2LABEL_COLS, shard_id=shard_id, issue_id=issue_ids) |
+ component_rows = self.issue_service.issue2component_tbl.Select( |
+ cnxn, cols=ISSUE2COMPONENT_COLS, shard_id=shard_id, issue_id=issue_ids) |
+ cc_rows = self.issue_service.issue2cc_tbl.Select( |
+ cnxn, cols=ISSUE2CC_COLS, shard_id=shard_id, issue_id=issue_ids) |
+ notify_rows = self.issue_service.issue2notify_tbl.Select( |
+ cnxn, cols=ISSUE2NOTIFY_COLS, shard_id=shard_id, issue_id=issue_ids) |
+ fieldvalue_rows = self.issue_service.issue2fieldvalue_tbl.Select( |
+ cnxn, cols=ISSUE2FIELDVALUE_COLS, shard_id=shard_id, |
+ issue_id=issue_ids) |
+ if issue_ids: |
+ ph = sql.PlaceHolders(issue_ids) |
+ relation_rows = self.issue_service.issuerelation_tbl.Select( |
+ cnxn, cols=ISSUERELATION_COLS, |
+ where=[('(issue_id IN (%s) OR dst_issue_id IN (%s))' % (ph, ph), |
+ issue_ids + issue_ids)]) |
+ dangling_relation_rows = self.issue_service.danglingrelation_tbl.Select( |
+ cnxn, cols=DANGLINGRELATION_COLS, issue_id=issue_ids) |
+ else: |
+ relation_rows = [] |
+ dangling_relation_rows = [] |
+ |
+ return self._DeserializeIssues( |
+ cnxn, issue_rows, summary_rows, label_rows, component_rows, cc_rows, |
+ notify_rows, fieldvalue_rows, relation_rows, dangling_relation_rows) |
+ |
+ |
+class IssueService(object): |
+ """The persistence layer for Monorail's issues, comments, and attachments.""" |
+ spam_labels = ts_mon.CounterMetric('monorail/issue_svc/spam_label') |
+ |
+ def __init__(self, project_service, config_service, cache_manager): |
+ """Initialize this object so that it is ready to use. |
+ |
+ Args: |
+ project_service: services object for project info. |
+ config_service: services object for tracker configuration info. |
+ cache_manager: local cache with distributed invalidation. |
+ """ |
+ # Tables that represent issue data. |
+ self.issue_tbl = sql.SQLTableManager(ISSUE_TABLE_NAME) |
+ self.issuesummary_tbl = sql.SQLTableManager(ISSUESUMMARY_TABLE_NAME) |
+ self.issue2label_tbl = sql.SQLTableManager(ISSUE2LABEL_TABLE_NAME) |
+ self.issue2component_tbl = sql.SQLTableManager(ISSUE2COMPONENT_TABLE_NAME) |
+ self.issue2cc_tbl = sql.SQLTableManager(ISSUE2CC_TABLE_NAME) |
+ self.issue2notify_tbl = sql.SQLTableManager(ISSUE2NOTIFY_TABLE_NAME) |
+ self.issue2fieldvalue_tbl = sql.SQLTableManager(ISSUE2FIELDVALUE_TABLE_NAME) |
+ self.issuerelation_tbl = sql.SQLTableManager(ISSUERELATION_TABLE_NAME) |
+ self.danglingrelation_tbl = sql.SQLTableManager(DANGLINGRELATION_TABLE_NAME) |
+ self.issueformerlocations_tbl = sql.SQLTableManager( |
+ ISSUEFORMERLOCATIONS_TABLE_NAME) |
+ |
+ # Tables that represent comments. |
+ self.comment_tbl = sql.SQLTableManager(COMMENT_TABLE_NAME) |
+ self.issueupdate_tbl = sql.SQLTableManager(ISSUEUPDATE_TABLE_NAME) |
+ self.attachment_tbl = sql.SQLTableManager(ATTACHMENT_TABLE_NAME) |
+ |
+ # Tables for cron tasks. |
+ self.reindexqueue_tbl = sql.SQLTableManager(REINDEXQUEUE_TABLE_NAME) |
+ |
+ # Tables for generating sequences of local IDs. |
+ self.localidcounter_tbl = sql.SQLTableManager(LOCALIDCOUNTER_TABLE_NAME) |
+ |
+ # Like a dictionary {(project_id, local_id): issue_id} |
+ # Use value centric cache here because we cannot store a tuple in the |
+ # Invalidate table. |
+ self.issue_id_2lc = IssueIDTwoLevelCache(cache_manager, self) |
+ # Like a dictionary {issue_id: issue} |
+ self.issue_2lc = IssueTwoLevelCache( |
+ cache_manager, self, project_service, config_service) |
+ |
+ self._config_service = config_service |
+ |
+ ### Issue ID lookups |
+ |
+ def LookupIssueIDs(self, cnxn, project_local_id_pairs): |
+ """Find the global issue IDs given the project ID and local ID of each.""" |
+ issue_id_dict, _misses = self.issue_id_2lc.GetAll( |
+ cnxn, project_local_id_pairs) |
+ |
+ # Put the Issue IDs in the order specified by project_local_id_pairs |
+ issue_ids = [issue_id_dict[pair] for pair in project_local_id_pairs |
+ if pair in issue_id_dict] |
+ |
+ return issue_ids |
+ |
+ def LookupIssueID(self, cnxn, project_id, local_id): |
+ """Find the global issue ID given the project ID and local ID.""" |
+ issue_ids = self.LookupIssueIDs(cnxn, [(project_id, local_id)]) |
+ try: |
+ return issue_ids[0] |
+ except IndexError: |
+ raise NoSuchIssueException() |
+ |
+ def ResolveIssueRefs( |
+ self, cnxn, ref_projects, default_project_name, refs): |
+ """Look up all the referenced issues and return their issue_ids. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ ref_projects: pre-fetched dict {project_name: project} of all projects |
+ mentioned in the refs as well as the default project. |
+ default_project_name: string name of the current project, this is used |
+ when the project_name in a ref is None. |
+ refs: list of (project_name, local_id) pairs. These are parsed from |
+ textual references in issue descriptions, comments, and the input |
+ in the blocked-on field. |
+ |
+ Returns: |
+ A list of issue_ids for all the referenced issues. References to issues |
+ in deleted projects and any issues not found are simply ignored. |
+ """ |
+ if not refs: |
+ return [] |
+ |
+ project_local_id_pairs = [] |
+ for project_name, local_id in refs: |
+ project = ref_projects.get(project_name or default_project_name) |
+ if not project or project.state == project_pb2.ProjectState.DELETABLE: |
+ continue # ignore any refs to issues in deleted projects |
+ project_local_id_pairs.append((project.project_id, local_id)) |
+ |
+ issue_ids = self.LookupIssueIDs(cnxn, project_local_id_pairs) |
+ return issue_ids |
+ |
+ ### Issue objects |
+ |
+ def CreateIssue( |
+ self, cnxn, services, project_id, summary, status, |
+ owner_id, cc_ids, labels, field_values, component_ids, reporter_id, |
+ marked_description, blocked_on=None, blocking=None, attachments=None, |
+ timestamp=None, index_now=True): |
+ """Create and store a new issue with all the given information. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ services: persistence layer for users, issues, and projects. |
+ project_id: int ID for the current project. |
+ summary: one-line summary string summarizing this issue. |
+ status: string issue status value. E.g., 'New'. |
+ owner_id: user ID of the issue owner. |
+ cc_ids: list of user IDs for users to be CC'd on changes. |
+ labels: list of label strings. E.g., 'Priority-High'. |
+ field_values: list of FieldValue PBs. |
+ component_ids: list of int component IDs. |
+ reporter_id: user ID of the user who reported the issue. |
+ marked_description: issue description with initial HTML markup. |
+ blocked_on: list of issue_ids that this issue is blocked on. |
+ blocking: list of issue_ids that this issue blocks. |
+ attachments: [(filename, contents, mimetype),...] attachments uploaded at |
+ the time the comment was made. |
+ timestamp: time that the issue was entered, defaults to now. |
+ index_now: True if the issue should be updated in the full text index. |
+ |
+ Returns: |
+ The integer local ID of the new issue. |
+ """ |
+ config = self._config_service.GetProjectConfig(cnxn, project_id) |
+ iids_to_invalidate = set() |
+ |
+ status = framework_bizobj.CanonicalizeLabel(status) |
+ labels = [framework_bizobj.CanonicalizeLabel(l) for l in labels] |
+ labels = [l for l in labels if l] |
+ |
+ issue = tracker_pb2.Issue() |
+ issue.project_id = project_id |
+ issue.summary = summary |
+ issue.status = status |
+ issue.owner_id = owner_id |
+ issue.cc_ids.extend(cc_ids) |
+ issue.labels.extend(labels) |
+ issue.field_values.extend(field_values) |
+ issue.component_ids.extend(component_ids) |
+ issue.reporter_id = reporter_id |
+ if blocked_on is not None: |
+ iids_to_invalidate.update(blocked_on) |
+ issue.blocked_on_iids = blocked_on |
+ if blocking is not None: |
+ iids_to_invalidate.update(blocking) |
+ issue.blocking_iids = blocking |
+ if attachments: |
+ issue.attachment_count = len(attachments) |
+ timestamp = timestamp or int(time.time()) |
+ issue.opened_timestamp = timestamp |
+ issue.modified_timestamp = timestamp |
+ |
+ comment = self._MakeIssueComment( |
+ project_id, reporter_id, marked_description, |
+ attachments=attachments, timestamp=timestamp, was_escaped=True) |
+ |
+ # Set the closed_timestamp both before and after filter rules. |
+ if not tracker_helpers.MeansOpenInProject( |
+ tracker_bizobj.GetStatus(issue), config): |
+ issue.closed_timestamp = timestamp |
+ filterrules_helpers.ApplyFilterRules(cnxn, services, issue, config) |
+ if not tracker_helpers.MeansOpenInProject( |
+ tracker_bizobj.GetStatus(issue), config): |
+ issue.closed_timestamp = timestamp |
+ |
+ classification = services.spam.ClassifyIssue(issue, comment) |
+ |
+ label = classification['outputLabel'] |
+ logging.info('issue/comment classification: %s' % classification) |
+ score = 0 |
+ for output in classification['outputMulti']: |
+ if output['label'] == label: |
+ score = float(output['score']) |
+ |
+ self.spam_labels.increment({'type': label}) |
+ |
+ if label == 'spam' and score > settings.classifier_spam_thresh: |
+ # Must be negative so as not to use up actual local_ids. |
+ # This can be fixed later if a human declares it to be ham. |
+ issue.local_id = self.AllocateNextSpamLocalID(cnxn, project_id) |
+ issue.is_spam = True |
+ else: |
+ issue.local_id = self.AllocateNextLocalID(cnxn, project_id) |
+ |
+ issue_id = self.InsertIssue(cnxn, issue) |
+ comment.issue_id = issue_id |
+ self.InsertComment(cnxn, comment) |
+ |
+ issue.issue_id = issue_id |
+ services.spam.RecordClassifierIssueVerdict( |
+ cnxn, issue, label=='spam', score) |
+ |
+ if permissions.HasRestrictions(issue, 'view'): |
+ self._config_service.InvalidateMemcache( |
+ [issue], key_prefix='nonviewable:') |
+ |
+ # Add a comment to existing issues saying they are now blocking or |
+ # blocked on this issue. |
+ blocked_add_issues = self.GetIssues(cnxn, blocked_on or []) |
+ for add_issue in blocked_add_issues: |
+ self.CreateIssueComment( |
+ cnxn, add_issue.project_id, add_issue.local_id, reporter_id, |
+ content='', |
+ amendments=[tracker_bizobj.MakeBlockingAmendment( |
+ [(issue.project_name, issue.local_id)], [], |
+ default_project_name=add_issue.project_name)]) |
+ blocking_add_issues = self.GetIssues(cnxn, blocking or []) |
+ for add_issue in blocking_add_issues: |
+ self.CreateIssueComment( |
+ cnxn, add_issue.project_id, add_issue.local_id, reporter_id, |
+ content='', |
+ amendments=[tracker_bizobj.MakeBlockedOnAmendment( |
+ [(issue.project_name, issue.local_id)], [], |
+ default_project_name=add_issue.project_name)]) |
+ |
+ self._UpdateIssuesModified( |
+ cnxn, iids_to_invalidate, modified_timestamp=timestamp) |
+ |
+ if index_now: |
+ tracker_fulltext.IndexIssues( |
+ cnxn, [issue], services.user, self, self._config_service) |
+ |
+ return issue.local_id |
+ |
+ def AllocateNewLocalIDs(self, cnxn, issues): |
+ # Filter to just the issues that need new local IDs. |
+ issues = [issue for issue in issues if issue.local_id < 0] |
+ |
+ for issue in issues: |
+ if issue.local_id < 0: |
+ issue.local_id = self.AllocateNextLocalID(cnxn, issue.project_id) |
+ |
+ self.UpdateIssues(cnxn, issues) |
+ |
+ logging.info("AllocateNewLocalIDs") |
+ |
+ def GetAllIssuesInProject(self, cnxn, project_id, min_local_id=None): |
+ """Special query to efficiently get ALL issues in a project. |
+ |
+ This is not done while the user is waiting, only by backround tasks. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ project_id: the ID of the project. |
+ min_local_id: optional int to start at. |
+ |
+ Returns: |
+ A list of Issue protocol buffers for all issues. |
+ """ |
+ all_local_ids = self.GetAllLocalIDsInProject( |
+ cnxn, project_id, min_local_id=min_local_id) |
+ return self.GetIssuesByLocalIDs(cnxn, project_id, all_local_ids) |
+ |
+ def GetAnyOnHandIssue(self, issue_ids, start=None, end=None): |
+ """Get any one issue from RAM or memcache, otherwise return None.""" |
+ return self.issue_2lc.GetAnyOnHandItem(issue_ids, start=start, end=end) |
+ |
+ def GetIssuesDict(self, cnxn, issue_ids, use_cache=True, shard_id=None): |
+ """Get a dict {iid: issue} from the DB or cache.""" |
+ issue_dict, _missed_iids = self.issue_2lc.GetAll( |
+ cnxn, issue_ids, use_cache=use_cache, shard_id=shard_id) |
+ return issue_dict |
+ |
+ def GetIssues(self, cnxn, issue_ids, use_cache=True, shard_id=None): |
+ """Get a list of Issue PBs from the DB or cache. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ issue_ids: integer global issue IDs of the issues. |
+ use_cache: optional boolean to turn off using the cache. |
+ shard_id: optional int shard_id to limit retrieval. |
+ |
+ Returns: |
+ A list of Issue PBs in the same order as the given issue_ids. |
+ """ |
+ issue_dict = self.GetIssuesDict( |
+ cnxn, issue_ids, use_cache=use_cache, shard_id=shard_id) |
+ |
+ # Return a list that is ordered the same as the given issue_ids. |
+ issue_list = [issue_dict[issue_id] for issue_id in issue_ids |
+ if issue_id in issue_dict] |
+ |
+ return issue_list |
+ |
+ def GetIssue(self, cnxn, issue_id): |
+ """Get one Issue PB from the DB. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ issue_id: integer global issue ID of the issue. |
+ |
+ Returns: |
+ The requested Issue protocol buffer. |
+ |
+ Raises: |
+ NoSuchIssueException: the issue was not found. |
+ """ |
+ issues = self.GetIssues(cnxn, [issue_id]) |
+ try: |
+ return issues[0] |
+ except IndexError: |
+ raise NoSuchIssueException() |
+ |
+ def GetIssuesByLocalIDs( |
+ self, cnxn, project_id, local_id_list, shard_id=None): |
+ """Get all the requested issues. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ project_id: int ID of the project to which the issues belong. |
+ local_id_list: list of integer local IDs for the requested issues. |
+ shard_id: optional int shard_id to choose a replica. |
+ |
+ Returns: |
+ List of Issue PBs for the requested issues. The result Issues |
+ will be ordered in the same order as local_id_list. |
+ """ |
+ issue_ids_to_fetch = self.LookupIssueIDs( |
+ cnxn, [(project_id, local_id) for local_id in local_id_list]) |
+ issues = self.GetIssues(cnxn, issue_ids_to_fetch, shard_id=shard_id) |
+ return issues |
+ |
+ def GetIssueByLocalID(self, cnxn, project_id, local_id): |
+ """Get one Issue PB from the DB. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ project_id: the ID of the project to which the issue belongs. |
+ local_id: integer local ID of the issue. |
+ |
+ Returns: |
+ The requested Issue protocol buffer. |
+ """ |
+ issues = self.GetIssuesByLocalIDs(cnxn, project_id, [local_id]) |
+ try: |
+ return issues[0] |
+ except IndexError: |
+ raise NoSuchIssueException('The issue %s:%d does not exist.' % ( |
+ project_id, local_id)) |
+ |
+ def GetOpenAndClosedIssues(self, cnxn, issue_ids): |
+ """Return the requested issues in separate open and closed lists. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ issue_ids: list of int issue issue_ids. |
+ |
+ Returns: |
+ A pair of lists, the first with open issues, second with closed issues. |
+ """ |
+ if not issue_ids: |
+ return [], [] # make one common case efficient |
+ |
+ issues = self.GetIssues(cnxn, issue_ids) |
+ project_ids = {issue.project_id for issue in issues} |
+ configs = self._config_service.GetProjectConfigs(cnxn, project_ids) |
+ open_issues = [] |
+ closed_issues = [] |
+ for issue in issues: |
+ config = configs[issue.project_id] |
+ if tracker_helpers.MeansOpenInProject( |
+ tracker_bizobj.GetStatus(issue), config): |
+ open_issues.append(issue) |
+ else: |
+ closed_issues.append(issue) |
+ |
+ return open_issues, closed_issues |
+ |
+ def GetCurrentLocationOfMovedIssue(self, cnxn, project_id, local_id): |
+ """Return the current location of a moved issue based on old location.""" |
+ issue_id = int(self.issueformerlocations_tbl.SelectValue( |
+ cnxn, 'issue_id', default=0, project_id=project_id, local_id=local_id)) |
+ if not issue_id: |
+ return None, None |
+ project_id, local_id = self.issue_tbl.SelectRow( |
+ cnxn, cols=['project_id', 'local_id'], id=issue_id) |
+ return project_id, local_id |
+ |
+ def GetPreviousLocations(self, cnxn, issue): |
+ """Get all the previous locations of an issue.""" |
+ location_rows = self.issueformerlocations_tbl.Select( |
+ cnxn, cols=['project_id', 'local_id'], issue_id=issue.issue_id) |
+ locations = [(pid, local_id) for (pid, local_id) in location_rows |
+ if pid != issue.project_id or local_id != issue.local_id] |
+ return locations |
+ |
+ def InsertIssue(self, cnxn, issue): |
+ """Store the given issue in SQL. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ issue: Issue PB to insert into the database. |
+ |
+ Returns: |
+ The int issue_id of the newly created issue. |
+ """ |
+ status_id = self._config_service.LookupStatusID( |
+ cnxn, issue.project_id, issue.status) |
+ row = (issue.project_id, issue.local_id, status_id, |
+ issue.owner_id or None, |
+ issue.reporter_id, |
+ issue.opened_timestamp, |
+ issue.closed_timestamp, |
+ issue.modified_timestamp, |
+ issue.derived_owner_id or None, |
+ self._config_service.LookupStatusID( |
+ cnxn, issue.project_id, issue.derived_status), |
+ bool(issue.deleted), |
+ issue.star_count, issue.attachment_count, |
+ issue.is_spam) |
+ # ISSUE_COLs[1:] to skip setting the ID |
+ # Insert into the Master DB. |
+ generated_ids = self.issue_tbl.InsertRows( |
+ cnxn, ISSUE_COLS[1:], [row], commit=False, return_generated_ids=True) |
+ issue_id = generated_ids[0] |
+ issue.issue_id = issue_id |
+ self.issue_tbl.Update( |
+ cnxn, {'shard': issue_id % settings.num_logical_shards}, |
+ id=issue.issue_id, commit=False) |
+ |
+ self._UpdateIssuesSummary(cnxn, [issue], commit=False) |
+ self._UpdateIssuesLabels( |
+ cnxn, [issue], issue.project_id, commit=False) |
+ self._UpdateIssuesFields(cnxn, [issue], commit=False) |
+ self._UpdateIssuesComponents(cnxn, [issue], commit=False) |
+ self._UpdateIssuesCc(cnxn, [issue], commit=False) |
+ self._UpdateIssuesNotify(cnxn, [issue], commit=False) |
+ self._UpdateIssuesRelation(cnxn, [issue], commit=False) |
+ cnxn.Commit() |
+ self._config_service.InvalidateMemcache([issue]) |
+ |
+ return issue_id |
+ |
+ def UpdateIssues( |
+ self, cnxn, issues, update_cols=None, just_derived=False, commit=True, |
+ invalidate=True): |
+ """Update the given issues in SQL. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ issues: list of issues to update. |
+ update_cols: optional list of just the field names to update. |
+ just_derived: set to True when only updating derived fields. |
+ commit: set to False to skip the DB commit and do it in the caller. |
+ invalidate: set to False to leave cache invalidatation to the caller. |
+ """ |
+ if not issues: |
+ return |
+ |
+ project_id = issues[0].project_id # All must be in the same project. |
+ assert all(issue.project_id == project_id for issue in issues) |
+ |
+ for issue in issues: # slow, but mysql will not allow REPLACE rows. |
+ delta = { |
+ 'project_id': issue.project_id, |
+ 'local_id': issue.local_id, |
+ 'owner_id': issue.owner_id or None, |
+ 'status_id': self._config_service.LookupStatusID( |
+ cnxn, issue.project_id, issue.status) or None, |
+ 'opened': issue.opened_timestamp, |
+ 'closed': issue.closed_timestamp, |
+ 'modified': issue.modified_timestamp, |
+ 'derived_owner_id': issue.derived_owner_id or None, |
+ 'derived_status_id': self._config_service.LookupStatusID( |
+ cnxn, issue.project_id, issue.derived_status) or None, |
+ 'deleted': bool(issue.deleted), |
+ 'star_count': issue.star_count, |
+ 'attachment_count': issue.attachment_count, |
+ 'is_spam': issue.is_spam, |
+ } |
+ if update_cols is not None: |
+ delta = {key: val for key, val in delta.iteritems() |
+ if key in update_cols} |
+ self.issue_tbl.Update(cnxn, delta, id=issue.issue_id, commit=False) |
+ |
+ if not update_cols: |
+ self._UpdateIssuesLabels( |
+ cnxn, issues, project_id, commit=False) |
+ self._UpdateIssuesCc(cnxn, issues, commit=False) |
+ self._UpdateIssuesFields(cnxn, issues, commit=False) |
+ self._UpdateIssuesComponents(cnxn, issues, commit=False) |
+ self._UpdateIssuesNotify(cnxn, issues, commit=False) |
+ if not just_derived: |
+ self._UpdateIssuesSummary(cnxn, issues, commit=False) |
+ self._UpdateIssuesRelation(cnxn, issues, commit=False) |
+ |
+ iids_to_invalidate = [issue.issue_id for issue in issues] |
+ if just_derived and invalidate: |
+ self.issue_2lc.InvalidateAllKeys(cnxn, iids_to_invalidate) |
+ elif invalidate: |
+ self.issue_2lc.InvalidateKeys(cnxn, iids_to_invalidate) |
+ if commit: |
+ cnxn.Commit() |
+ if invalidate: |
+ self._config_service.InvalidateMemcache(issues) |
+ |
+ def UpdateIssue( |
+ self, cnxn, issue, update_cols=None, just_derived=False, commit=True, |
+ invalidate=True): |
+ """Update the given issue in SQL. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ issue: the issue to update. |
+ update_cols: optional list of just the field names to update. |
+ just_derived: set to True when only updating derived fields. |
+ commit: set to False to skip the DB commit and do it in the caller. |
+ invalidate: set to False to leave cache invalidatation to the caller. |
+ """ |
+ self.UpdateIssues( |
+ cnxn, [issue], update_cols=update_cols, just_derived=just_derived, |
+ commit=commit, invalidate=invalidate) |
+ |
+ def _UpdateIssuesSummary(self, cnxn, issues, commit=True): |
+ """Update the IssueSummary table rows for the given issues.""" |
+ self.issuesummary_tbl.InsertRows( |
+ cnxn, ISSUESUMMARY_COLS, |
+ [(issue.issue_id, issue.summary) for issue in issues], |
+ replace=True, commit=commit) |
+ |
+ def _UpdateIssuesLabels(self, cnxn, issues, project_id, commit=True): |
+ """Update the Issue2Label table rows for the given issues.""" |
+ label_rows = [] |
+ for issue in issues: |
+ issue_shard = issue.issue_id % settings.num_logical_shards |
+ # TODO(jrobbins): If the user adds many novel labels in one issue update, |
+ # that could be slow. Solution is to add all new labels in a batch first. |
+ label_rows.extend( |
+ (issue.issue_id, |
+ self._config_service.LookupLabelID(cnxn, project_id, label), False, |
+ issue_shard) |
+ for label in issue.labels) |
+ label_rows.extend( |
+ (issue.issue_id, |
+ self._config_service.LookupLabelID(cnxn, project_id, label), True, |
+ issue_shard) |
+ for label in issue.derived_labels) |
+ |
+ self.issue2label_tbl.Delete( |
+ cnxn, issue_id=[issue.issue_id for issue in issues], |
+ commit=False) |
+ self.issue2label_tbl.InsertRows( |
+ cnxn, ISSUE2LABEL_COLS + ['issue_shard'], |
+ label_rows, ignore=True, commit=commit) |
+ |
+ def _UpdateIssuesFields(self, cnxn, issues, commit=True): |
+ """Update the Issue2FieldValue table rows for the given issues.""" |
+ fieldvalue_rows = [] |
+ for issue in issues: |
+ issue_shard = issue.issue_id % settings.num_logical_shards |
+ for fv in issue.field_values: |
+ fieldvalue_rows.append( |
+ (issue.issue_id, fv.field_id, fv.int_value, fv.str_value, |
+ fv.user_id or None, fv.derived, issue_shard)) |
+ |
+ self.issue2fieldvalue_tbl.Delete( |
+ cnxn, issue_id=[issue.issue_id for issue in issues], commit=False) |
+ self.issue2fieldvalue_tbl.InsertRows( |
+ cnxn, ISSUE2FIELDVALUE_COLS + ['issue_shard'], |
+ fieldvalue_rows, commit=commit) |
+ |
+ def _UpdateIssuesComponents(self, cnxn, issues, commit=True): |
+ """Update the Issue2Component table rows for the given issues.""" |
+ issue2component_rows = [] |
+ for issue in issues: |
+ issue_shard = issue.issue_id % settings.num_logical_shards |
+ issue2component_rows.extend( |
+ (issue.issue_id, component_id, False, issue_shard) |
+ for component_id in issue.component_ids) |
+ issue2component_rows.extend( |
+ (issue.issue_id, component_id, True, issue_shard) |
+ for component_id in issue.derived_component_ids) |
+ |
+ self.issue2component_tbl.Delete( |
+ cnxn, issue_id=[issue.issue_id for issue in issues], commit=False) |
+ self.issue2component_tbl.InsertRows( |
+ cnxn, ISSUE2COMPONENT_COLS + ['issue_shard'], |
+ issue2component_rows, ignore=True, commit=commit) |
+ |
+ def _UpdateIssuesCc(self, cnxn, issues, commit=True): |
+ """Update the Issue2Cc table rows for the given issues.""" |
+ cc_rows = [] |
+ for issue in issues: |
+ issue_shard = issue.issue_id % settings.num_logical_shards |
+ cc_rows.extend( |
+ (issue.issue_id, cc_id, False, issue_shard) |
+ for cc_id in issue.cc_ids) |
+ cc_rows.extend( |
+ (issue.issue_id, cc_id, True, issue_shard) |
+ for cc_id in issue.derived_cc_ids) |
+ |
+ self.issue2cc_tbl.Delete( |
+ cnxn, issue_id=[issue.issue_id for issue in issues], commit=False) |
+ self.issue2cc_tbl.InsertRows( |
+ cnxn, ISSUE2CC_COLS + ['issue_shard'], |
+ cc_rows, ignore=True, commit=commit) |
+ |
+ def _UpdateIssuesNotify(self, cnxn, issues, commit=True): |
+ """Update the Issue2Notify table rows for the given issues.""" |
+ notify_rows = [] |
+ for issue in issues: |
+ derived_rows = [[issue.issue_id, email] |
+ for email in issue.derived_notify_addrs] |
+ notify_rows.extend(derived_rows) |
+ |
+ self.issue2notify_tbl.Delete( |
+ cnxn, issue_id=[issue.issue_id for issue in issues], commit=False) |
+ self.issue2notify_tbl.InsertRows( |
+ cnxn, ISSUE2NOTIFY_COLS, notify_rows, ignore=True, commit=commit) |
+ |
+ def _UpdateIssuesRelation(self, cnxn, issues, commit=True): |
+ """Update the IssueRelation table rows for the given issues.""" |
+ relation_rows = [] |
+ dangling_relation_rows = [] |
+ for issue in issues: |
+ for dst_issue_id in issue.blocked_on_iids: |
+ relation_rows.append((issue.issue_id, dst_issue_id, 'blockedon')) |
+ for dst_issue_id in issue.blocking_iids: |
+ relation_rows.append((dst_issue_id, issue.issue_id, 'blockedon')) |
+ for dst_ref in issue.dangling_blocked_on_refs: |
+ dangling_relation_rows.append(( |
+ issue.issue_id, dst_ref.project, dst_ref.issue_id, 'blockedon')) |
+ for dst_ref in issue.dangling_blocking_refs: |
+ dangling_relation_rows.append(( |
+ issue.issue_id, dst_ref.project, dst_ref.issue_id, 'blocking')) |
+ if issue.merged_into: |
+ relation_rows.append((issue.issue_id, issue.merged_into, 'mergedinto')) |
+ |
+ self.issuerelation_tbl.Delete( |
+ cnxn, issue_id=[issue.issue_id for issue in issues], commit=False) |
+ self.issuerelation_tbl.Delete( |
+ cnxn, dst_issue_id=[issue.issue_id for issue in issues], |
+ kind='blockedon', commit=False) |
+ self.issuerelation_tbl.InsertRows( |
+ cnxn, ISSUERELATION_COLS, relation_rows, ignore=True, commit=commit) |
+ self.danglingrelation_tbl.Delete( |
+ cnxn, issue_id=[issue.issue_id for issue in issues], commit=False) |
+ self.danglingrelation_tbl.InsertRows( |
+ cnxn, DANGLINGRELATION_COLS, dangling_relation_rows, ignore=True, |
+ commit=commit) |
+ |
+ def _UpdateIssuesModified( |
+ self, cnxn, iids, modified_timestamp=None, invalidate=True): |
+ """Store a modified timestamp for each of the specified issues.""" |
+ delta = {'modified': modified_timestamp or int(time.time())} |
+ self.issue_tbl.Update(cnxn, delta, id=iids, commit=False) |
+ if invalidate: |
+ self.InvalidateIIDs(cnxn, iids) |
+ |
+ def DeltaUpdateIssue( |
+ self, cnxn, services, reporter_id, project_id, |
+ config, issue, status, owner_id, cc_add, cc_remove, comp_ids_add, |
+ comp_ids_remove, labels_add, labels_remove, field_vals_add, |
+ field_vals_remove, fields_clear, blocked_on_add=None, |
+ blocked_on_remove=None, blocking_add=None, blocking_remove=None, |
+ merged_into=None, index_now=False, comment=None, summary=None, |
+ iids_to_invalidate=None, rules=None, predicate_asts=None, |
+ timestamp=None): |
+ """Update the issue in the database and return a set of update tuples. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ services: connections to persistence layer. |
+ reporter_id: user ID of the user making this change. |
+ project_id: int ID for the current project. |
+ config: ProjectIssueConfig PB for this project. |
+ issue: Issue PB of issue to update. |
+ status: new issue status string, if a change is desired. |
+ owner_id: user ID of the new issue owner, if a change is desired. |
+ cc_add: list of user IDs of users to add to CC list. |
+ cc_remove: list of user IDs of users to remove from CC list. |
+ comp_ids_add: list of component IDs to add to the issue. |
+ comp_ids_remove: list of component IDs to remove from the issue. |
+ labels_add: list of issue label strings to add. |
+ labels_remove: list of issue label strings to remove. |
+ field_vals_add: dict of FieldValue PBs to add. |
+ field_vals_remove: list of FieldValue PBs to remove. |
+ fields_clear: list of custom field IDs to clear. |
+ blocked_on_add: list of IIDs that this issue is now blocked on. |
+ blocked_on_remove: list of IIDs that this issue is no longer blocked on. |
+ blocking_add: list of IIDs that this issue is blocking. |
+ blocking_remove: list of IIDs that this issue is no longer blocking. |
+ merged_into: IID of issue that this issue was merged into, 0 to clear, |
+ or None for no change. |
+ index_now: True if the issue should be updated in the full text index. |
+ comment: This should be the content of the comment |
+ corresponding to this change. |
+ summary: new issue summary, currently only used by GData API. |
+ rules: optional list of preloaded FilterRule PBs for this project. |
+ predicate_asts: optional list of QueryASTs for the rules. If rules are |
+ provided, then predicate_asts should also be provided. |
+ timestamp: int timestamp set during testing, otherwise defaults to |
+ int(time.time()). |
+ |
+ Returns: |
+ A list of Amendment PBs that describe the set of metadata updates that |
+ the user made. This tuple is later used in making the IssueComment. |
+ """ |
+ old_effective_status = tracker_bizobj.GetStatus(issue) |
+ |
+ # Make all user input safe to echo out again later. |
+ status = framework_bizobj.CanonicalizeLabel(status) |
+ labels_add = [framework_bizobj.CanonicalizeLabel(l) for l in labels_add] |
+ labels_add = [l for l in labels_add if l] |
+ labels_remove = [framework_bizobj.CanonicalizeLabel(l) |
+ for l in labels_remove] |
+ labels_remove = [l for l in labels_remove if l] |
+ |
+ logging.info( |
+ 'Bulk edit to project_id %s issue.local_id %s', |
+ project_id, issue.local_id) |
+ if iids_to_invalidate is None: |
+ iids_to_invalidate = set([issue.issue_id]) |
+ invalidate = True |
+ else: |
+ iids_to_invalidate.add(issue.issue_id) |
+ invalidate = False # Caller will do it. |
+ |
+ # Store each updated value in the issue PB, and compute Update PBs |
+ amendments = [] |
+ if status is not None and status != issue.status: |
+ amendments.append(tracker_bizobj.MakeStatusAmendment( |
+ status, issue.status)) |
+ issue.status = status |
+ if owner_id is not None and owner_id != issue.owner_id: |
+ amendments.append(tracker_bizobj.MakeOwnerAmendment( |
+ owner_id, issue.owner_id)) |
+ issue.owner_id = owner_id |
+ |
+ # compute the set of cc'd users added and removed |
+ cc_add = [cc for cc in cc_add if cc not in issue.cc_ids] |
+ cc_remove = [cc for cc in cc_remove if cc in issue.cc_ids] |
+ if cc_add or cc_remove: |
+ cc_ids = [cc for cc in list(issue.cc_ids) + cc_add |
+ if cc not in cc_remove] |
+ issue.cc_ids = cc_ids |
+ amendments.append(tracker_bizobj.MakeCcAmendment(cc_add, cc_remove)) |
+ |
+ # compute the set of components added and removed |
+ comp_ids_add = [c for c in comp_ids_add if c not in issue.component_ids] |
+ comp_ids_remove = [c for c in comp_ids_remove if c in issue.component_ids] |
+ if comp_ids_add or comp_ids_remove: |
+ comp_ids = [cid for cid in list(issue.component_ids) + comp_ids_add |
+ if cid not in comp_ids_remove] |
+ issue.component_ids = comp_ids |
+ amendments.append(tracker_bizobj.MakeComponentsAmendment( |
+ comp_ids_add, comp_ids_remove, config)) |
+ |
+ # compute the set of labels added and removed |
+ (labels, update_labels_add, |
+ update_labels_remove) = framework_bizobj.MergeLabels( |
+ issue.labels, labels_add, labels_remove, |
+ config.exclusive_label_prefixes) |
+ |
+ if update_labels_add or update_labels_remove: |
+ issue.labels = labels |
+ amendments.append(tracker_bizobj.MakeLabelsAmendment( |
+ update_labels_add, update_labels_remove)) |
+ |
+ # compute the set of custom fields added and removed |
+ (field_vals, update_fields_add, |
+ update_fields_remove) = tracker_bizobj.MergeFields( |
+ issue.field_values, field_vals_add, field_vals_remove, |
+ config.field_defs) |
+ |
+ if update_fields_add or update_fields_remove: |
+ issue.field_values = field_vals |
+ for fd in config.field_defs: |
+ added_values_this_field = [ |
+ fv for fv in update_fields_add if fv.field_id == fd.field_id] |
+ if added_values_this_field: |
+ amendments.append(tracker_bizobj.MakeFieldAmendment( |
+ fd.field_id, config, |
+ [tracker_bizobj.GetFieldValue(fv, {}) |
+ for fv in added_values_this_field], |
+ old_values=[])) |
+ removed_values_this_field = [ |
+ fv for fv in update_fields_remove if fv.field_id == fd.field_id] |
+ if removed_values_this_field: |
+ amendments.append(tracker_bizobj.MakeFieldAmendment( |
+ fd.field_id, config, [], |
+ old_values=[tracker_bizobj.GetFieldValue(fv, {}) |
+ for fv in removed_values_this_field])) |
+ |
+ if fields_clear: |
+ field_clear_set = set(fields_clear) |
+ revised_fields = [] |
+ for fd in config.field_defs: |
+ if fd.field_id not in field_clear_set: |
+ revised_fields.extend( |
+ fv for fv in issue.field_values if fv.field_id == fd.field_id) |
+ else: |
+ amendments.append( |
+ tracker_bizobj.MakeFieldClearedAmendment(fd.field_id, config)) |
+ if fd.field_type == tracker_pb2.FieldTypes.ENUM_TYPE: |
+ prefix = fd.field_name.lower() + '-' |
+ filtered_labels = [ |
+ lab for lab in issue.labels |
+ if not lab.lower().startswith(prefix)] |
+ issue.labels = filtered_labels |
+ |
+ issue.field_values = revised_fields |
+ |
+ if blocked_on_add or blocked_on_remove: |
+ old_blocked_on = issue.blocked_on_iids |
+ blocked_on_add = [iid for iid in blocked_on_add |
+ if iid not in old_blocked_on] |
+ add_refs = [(ref_issue.project_name, ref_issue.local_id) |
+ for ref_issue in self.GetIssues(cnxn, blocked_on_add)] |
+ blocked_on_rm = [iid for iid in blocked_on_remove |
+ if iid in old_blocked_on] |
+ remove_refs = [ |
+ (ref_issue.project_name, ref_issue.local_id) |
+ for ref_issue in self.GetIssues(cnxn, blocked_on_rm)] |
+ amendments.append(tracker_bizobj.MakeBlockedOnAmendment( |
+ add_refs, remove_refs, default_project_name=issue.project_name)) |
+ blocked_on = [iid for iid in old_blocked_on + blocked_on_add |
+ if iid not in blocked_on_remove] |
+ issue.blocked_on_iids = blocked_on |
+ iids_to_invalidate.update(blocked_on_add + blocked_on_remove) |
+ |
+ if blocking_add or blocking_remove: |
+ old_blocking = issue.blocking_iids |
+ blocking_add = [iid for iid in blocking_add |
+ if iid not in old_blocking] |
+ add_refs = [(ref_issue.project_name, ref_issue.local_id) |
+ for ref_issue in self.GetIssues(cnxn, blocking_add)] |
+ blocking_remove = [iid for iid in blocking_remove |
+ if iid in old_blocking] |
+ remove_refs = [ |
+ (ref_issue.project_name, ref_issue.local_id) |
+ for ref_issue in self.GetIssues(cnxn, blocking_remove)] |
+ amendments.append(tracker_bizobj.MakeBlockingAmendment( |
+ add_refs, remove_refs, default_project_name=issue.project_name)) |
+ blocking_refs = [iid for iid in old_blocking + blocking_add |
+ if iid not in blocking_remove] |
+ issue.blocking_iids = blocking_refs |
+ iids_to_invalidate.update(blocking_add + blocking_remove) |
+ |
+ if merged_into is not None and merged_into != issue.merged_into: |
+ merged_remove = issue.merged_into |
+ merged_add = merged_into |
+ issue.merged_into = merged_into |
+ try: |
+ remove_issue = self.GetIssue(cnxn, merged_remove) |
+ remove_ref = remove_issue.project_name, remove_issue.local_id |
+ iids_to_invalidate.add(merged_remove) |
+ except NoSuchIssueException: |
+ remove_ref = None |
+ |
+ try: |
+ add_issue = self.GetIssue(cnxn, merged_add) |
+ add_ref = add_issue.project_name, add_issue.local_id |
+ iids_to_invalidate.add(merged_add) |
+ except NoSuchIssueException: |
+ add_ref = None |
+ |
+ amendments.append(tracker_bizobj.MakeMergedIntoAmendment( |
+ add_ref, remove_ref, default_project_name=issue.project_name)) |
+ |
+ if summary and summary != issue.summary: |
+ amendments.append(tracker_bizobj.MakeSummaryAmendment( |
+ summary, issue.summary)) |
+ issue.summary = summary |
+ |
+ # If this was a no-op with no comment, bail out and don't save, |
+ # invalidate, or re-index anything. |
+ if not amendments and (not comment or not comment.strip()): |
+ return [], None |
+ |
+ # Note: no need to check for collisions when the user is doing a delta. |
+ |
+ # update the modified_timestamp for any comment added, even if it was |
+ # just a text comment with no issue fields changed. |
+ issue.modified_timestamp = timestamp or int(time.time()) |
+ |
+ # Update the closed timestamp before filter rules so that rules |
+ # can test for closed_timestamp, and also after filter rules |
+ # so that closed_timestamp will be set if the issue is closed by the rule. |
+ _UpdateClosedTimestamp(config, issue, old_effective_status) |
+ if rules is None: |
+ logging.info('Rules were not given') |
+ rules = services.features.GetFilterRules(cnxn, config.project_id) |
+ predicate_asts = filterrules_helpers.ParsePredicateASTs( |
+ rules, config, None) |
+ |
+ filterrules_helpers.ApplyGivenRules( |
+ cnxn, services, issue, config, rules, predicate_asts) |
+ _UpdateClosedTimestamp(config, issue, old_effective_status) |
+ |
+ # Store the issue in SQL. |
+ self.UpdateIssue(cnxn, issue, commit=False, invalidate=False) |
+ |
+ comment_pb = self.CreateIssueComment( |
+ cnxn, project_id, issue.local_id, reporter_id, comment, |
+ amendments=amendments, commit=False) |
+ self._UpdateIssuesModified( |
+ cnxn, iids_to_invalidate, modified_timestamp=issue.modified_timestamp, |
+ invalidate=invalidate) |
+ |
+ if not invalidate: |
+ cnxn.Commit() |
+ |
+ if index_now: |
+ tracker_fulltext.IndexIssues( |
+ cnxn, [issue], services.user_service, self, self._config_service) |
+ |
+ return amendments, comment_pb |
+ |
+ def InvalidateIIDs(self, cnxn, iids_to_invalidate): |
+ """Invalidate the specified issues in the Invalidate table and memcache.""" |
+ issues_to_invalidate = self.GetIssues(cnxn, iids_to_invalidate) |
+ self.issue_2lc.InvalidateKeys(cnxn, iids_to_invalidate) |
+ self._config_service.InvalidateMemcache(issues_to_invalidate) |
+ |
+ def ApplyIssueComment( |
+ self, cnxn, services, reporter_id, project_id, |
+ local_id, summary, status, owner_id, cc_ids, labels, field_values, |
+ component_ids, blocked_on, blocking, dangling_blocked_on_refs, |
+ dangling_blocking_refs, merged_into, index_now=True, |
+ page_gen_ts=None, comment=None, inbound_message=None, attachments=None, |
+ timestamp=None): |
+ """Update the issue in the database and return info for notifications. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ services: connection to persistence layer. |
+ reporter_id: user ID of the user making this change. |
+ project_id: int Project ID for the current project. |
+ local_id: integer local ID of the issue to update. |
+ summary: new issue summary string. |
+ status: new issue status string. |
+ owner_id: user ID of the new issue owner. |
+ cc_ids: list of user IDs of users to CC when the issue changes. |
+ labels: list of new issue label strings. |
+ field_values: list of FieldValue PBs. |
+ component_ids: list of int component IDs. |
+ blocked_on: list of IIDs that this issue is blocked on. |
+ blocking: list of IIDs that this issue is blocking. |
+ dangling_blocked_on_refs: list of Codesite issues this is blocked on. |
+ dangling_blocking_refs: list of Codesite issues this is blocking. |
+ merged_into: IID of issue that this issue was merged into, 0 to clear. |
+ index_now: True if the issue should be updated in the full text index. |
+ page_gen_ts: time at which the issue HTML page was generated, |
+ used in detecting mid-air collisions. |
+ comment: This should be the content of the comment |
+ corresponding to this change. |
+ inbound_message: optional string full text of an email that caused |
+ this comment to be added. |
+ attachments: This should be a list of |
+ [(filename, contents, mimetype),...] attachments uploaded at |
+ the time the comment was made. |
+ timestamp: int timestamp set during testing, otherwise defaults to |
+ int(time.time()). |
+ |
+ Returns: |
+ (amendments, comment_pb). Amendments is a list of Amendment PBs |
+ that describe the set of metadata updates that the user made. |
+ Comment_pb is the IssueComment for the change. |
+ |
+ Raises: |
+ MidAirCollisionException: indicates that the issue has been |
+ changed since the user loaded the page. |
+ """ |
+ status = framework_bizobj.CanonicalizeLabel(status) |
+ labels = [framework_bizobj.CanonicalizeLabel(l) for l in labels] |
+ labels = [l for l in labels if l] |
+ |
+ # Use canonical label names |
+ label_ids = self._config_service.LookupLabelIDs( |
+ cnxn, project_id, labels, autocreate=True) |
+ labels = [self._config_service.LookupLabel(cnxn, project_id, l_id) |
+ for l_id in label_ids] |
+ |
+ # Get the issue and project configurations. |
+ config = self._config_service.GetProjectConfig(cnxn, project_id) |
+ issue = self.GetIssueByLocalID(cnxn, project_id, local_id) |
+ |
+ # Store each updated value in the issue PB, and compute amendments |
+ amendments = [] |
+ iids_to_invalidate = set() |
+ |
+ if summary and summary != issue.summary: |
+ amendments.append(tracker_bizobj.MakeSummaryAmendment( |
+ summary, issue.summary)) |
+ issue.summary = summary |
+ |
+ old_effective_status = tracker_bizobj.GetStatus(issue) |
+ if status != issue.status: |
+ amendments.append(tracker_bizobj.MakeStatusAmendment( |
+ status, issue.status)) |
+ issue.status = status |
+ |
+ if owner_id != issue.owner_id: |
+ amendments.append(tracker_bizobj.MakeOwnerAmendment( |
+ owner_id, issue.owner_id)) |
+ if owner_id == framework_constants.NO_USER_SPECIFIED: |
+ issue.reset('owner_id') |
+ else: |
+ issue.owner_id = owner_id |
+ |
+ # TODO(jrobbins): factor the CC code into a method and add a test |
+ # compute the set of cc'd users added and removed |
+ cc_added = [cc for cc in cc_ids if cc not in issue.cc_ids] |
+ cc_removed = [cc for cc in issue.cc_ids if cc not in cc_ids] |
+ if cc_added or cc_removed: |
+ amendments.append(tracker_bizobj.MakeCcAmendment(cc_added, cc_removed)) |
+ issue.cc_ids = cc_ids |
+ |
+ # TODO(jrobbins): factor the labels code into a method and add a test |
+ # compute the set of labels added and removed |
+ labels_added = [lab for lab in labels |
+ if lab not in issue.labels] |
+ labels_removed = [lab for lab in issue.labels |
+ if lab not in labels] |
+ if labels_added or labels_removed: |
+ amendments.append(tracker_bizobj.MakeLabelsAmendment( |
+ labels_added, labels_removed)) |
+ issue.labels = labels |
+ |
+ old_field_values = collections.defaultdict(list) |
+ for ofv in issue.field_values: |
+ # Passing {} because I just want the user_id, not the email address. |
+ old_field_values[ofv.field_id].append( |
+ tracker_bizobj.GetFieldValue(ofv, {})) |
+ for field_id, values in old_field_values.iteritems(): |
+ old_field_values[field_id] = sorted(values) |
+ |
+ new_field_values = collections.defaultdict(list) |
+ for nfv in field_values: |
+ new_field_values[nfv.field_id].append( |
+ tracker_bizobj.GetFieldValue(nfv, {})) |
+ for field_id, values in new_field_values.iteritems(): |
+ new_field_values[field_id] = sorted(values) |
+ |
+ field_ids_added = {fv.field_id for fv in field_values |
+ if fv.field_id not in old_field_values} |
+ field_ids_removed = {ofv.field_id for ofv in issue.field_values |
+ if ofv.field_id not in new_field_values} |
+ field_ids_changed = { |
+ fv.field_id for fv in field_values |
+ if (fv.field_id in old_field_values and |
+ old_field_values[fv.field_id] != new_field_values[fv.field_id])} |
+ |
+ if field_ids_added or field_ids_removed or field_ids_changed: |
+ amendments.extend( |
+ tracker_bizobj.MakeFieldAmendment(fid, config, new_field_values[fid]) |
+ for fid in field_ids_added) |
+ amendments.extend( |
+ tracker_bizobj.MakeFieldAmendment( |
+ fid, config, new_field_values[fid], |
+ old_values=old_field_values[fid]) |
+ for fid in field_ids_changed) |
+ amendments.extend( |
+ tracker_bizobj.MakeFieldAmendment(fid, config, []) |
+ for fid in field_ids_removed) |
+ |
+ issue.field_values = field_values |
+ |
+ comps_added = [comp for comp in component_ids |
+ if comp not in issue.component_ids] |
+ comps_removed = [comp for comp in issue.component_ids |
+ if comp not in component_ids] |
+ if comps_added or comps_removed: |
+ amendments.append(tracker_bizobj.MakeComponentsAmendment( |
+ comps_added, comps_removed, config)) |
+ issue.component_ids = component_ids |
+ |
+ if merged_into != issue.merged_into: |
+ # TODO(jrobbins): refactor this into LookupIssueRefByIssueID(). |
+ try: |
+ merged_remove = self.GetIssue(cnxn, issue.merged_into) |
+ remove_ref = merged_remove.project_name, merged_remove.local_id |
+ iids_to_invalidate.add(issue.merged_into) |
+ except NoSuchIssueException: |
+ remove_ref = None |
+ |
+ try: |
+ merged_add = self.GetIssue(cnxn, merged_into) |
+ add_ref = merged_add.project_name, merged_add.local_id |
+ iids_to_invalidate.add(merged_into) |
+ except NoSuchIssueException: |
+ add_ref = None |
+ |
+ issue.merged_into = merged_into |
+ amendments.append(tracker_bizobj.MakeMergedIntoAmendment( |
+ add_ref, remove_ref, default_project_name=issue.project_name)) |
+ |
+ blockers_added, blockers_removed = framework_helpers.ComputeListDeltas( |
+ issue.blocked_on_iids, blocked_on) |
+ danglers_added, danglers_removed = framework_helpers.ComputeListDeltas( |
+ issue.dangling_blocked_on_refs, dangling_blocked_on_refs) |
+ blocked_add_issues = [] |
+ blocked_remove_issues = [] |
+ if blockers_added or blockers_removed or danglers_added or danglers_removed: |
+ blocked_add_issues = self.GetIssues(cnxn, blockers_added) |
+ add_refs = [(ref_issue.project_name, ref_issue.local_id) |
+ for ref_issue in blocked_add_issues] |
+ add_refs.extend([(ref.project, ref.issue_id) for ref in danglers_added]) |
+ blocked_remove_issues = self.GetIssues(cnxn, blockers_removed) |
+ remove_refs = [ |
+ (ref_issue.project_name, ref_issue.local_id) |
+ for ref_issue in blocked_remove_issues] |
+ remove_refs.extend([(ref.project, ref.issue_id) |
+ for ref in danglers_removed]) |
+ amendments.append(tracker_bizobj.MakeBlockedOnAmendment( |
+ add_refs, remove_refs, default_project_name=issue.project_name)) |
+ issue.blocked_on_iids = blocked_on |
+ issue.dangling_blocked_on_refs = dangling_blocked_on_refs |
+ iids_to_invalidate.update(blockers_added + blockers_removed) |
+ |
+ blockers_added, blockers_removed = framework_helpers.ComputeListDeltas( |
+ issue.blocking_iids, blocking) |
+ danglers_added, danglers_removed = framework_helpers.ComputeListDeltas( |
+ issue.dangling_blocking_refs, dangling_blocking_refs) |
+ blocking_add_issues = [] |
+ blocking_remove_issues = [] |
+ if blockers_added or blockers_removed or danglers_added or danglers_removed: |
+ blocking_add_issues = self.GetIssues(cnxn, blockers_added) |
+ add_refs = [(ref_issue.project_name, ref_issue.local_id) |
+ for ref_issue in blocking_add_issues] |
+ add_refs.extend([(ref.project, ref.issue_id) for ref in danglers_added]) |
+ blocking_remove_issues = self.GetIssues(cnxn, blockers_removed) |
+ remove_refs = [ |
+ (ref_issue.project_name, ref_issue.local_id) |
+ for ref_issue in blocking_remove_issues] |
+ remove_refs.extend([(ref.project, ref.issue_id) |
+ for ref in danglers_removed]) |
+ amendments.append(tracker_bizobj.MakeBlockingAmendment( |
+ add_refs, remove_refs, default_project_name=issue.project_name)) |
+ issue.blocking_iids = blocking |
+ issue.dangling_blocking_refs = dangling_blocking_refs |
+ iids_to_invalidate.update(blockers_added + blockers_removed) |
+ |
+ logging.info('later amendments so far is %r', amendments) |
+ |
+ # Raise an exception if the issue was changed by another user |
+ # while this user was viewing/editing the issue. |
+ if page_gen_ts and amendments: |
+ # The issue timestamp is stored in seconds, convert to microseconds to |
+ # match the page_gen_ts. |
+ issue_ts = issue.modified_timestamp * 1000000 |
+ if issue_ts > page_gen_ts: |
+ logging.info('%d > %d', issue_ts, page_gen_ts) |
+ logging.info('amendments: %s', amendments) |
+ # Forget all the modificiations made to this issue in RAM. |
+ self.issue_2lc.InvalidateKeys(cnxn, [issue.issue_id]) |
+ raise MidAirCollisionException('issue %d' % local_id, local_id) |
+ |
+ # update the modified_timestamp for any comment added, even if it was |
+ # just a text comment with no issue fields changed. |
+ issue.modified_timestamp = timestamp or int(time.time()) |
+ |
+ # Update closed_timestamp both before and after filter rules. |
+ _UpdateClosedTimestamp(config, issue, old_effective_status) |
+ filterrules_helpers.ApplyFilterRules(cnxn, services, issue, config) |
+ _UpdateClosedTimestamp(config, issue, old_effective_status) |
+ |
+ self.UpdateIssue(cnxn, issue) |
+ # TODO(jrobbins): only invalidate nonviewable if the following changed: |
+ # restriction label, owner, cc, or user-type custom field. |
+ self._config_service.InvalidateMemcache([issue], key_prefix='nonviewable:') |
+ |
+ classification = services.spam.ClassifyComment(comment) |
+ |
+ label = classification['outputLabel'] |
+ logging.info('comment classification: %s' % classification) |
+ score = 0 |
+ is_spam = False |
+ for output in classification['outputMulti']: |
+ if output['label'] == label: |
+ score = float(output['score']) |
+ if label == 'spam' and score > settings.classifier_spam_thresh: |
+ logging.info('spam comment: %s' % comment) |
+ is_spam = True |
+ |
+ if amendments or (comment and comment.strip()) or attachments: |
+ logging.info('amendments = %r', amendments) |
+ comment_pb = self.CreateIssueComment( |
+ cnxn, project_id, local_id, reporter_id, comment, |
+ amendments=amendments, attachments=attachments, |
+ inbound_message=inbound_message, is_spam=is_spam) |
+ services.spam.RecordClassifierCommentVerdict( |
+ cnxn, comment_pb, is_spam, score) |
+ else: |
+ comment_pb = None |
+ |
+ # Add a comment to the newly added issues saying they are now blocking |
+ # this issue. |
+ for add_issue in blocked_add_issues: |
+ self.CreateIssueComment( |
+ cnxn, add_issue.project_id, add_issue.local_id, reporter_id, |
+ content='', |
+ amendments=[tracker_bizobj.MakeBlockingAmendment( |
+ [(issue.project_name, issue.local_id)], [], |
+ default_project_name=add_issue.project_name)]) |
+ # Add a comment to the newly removed issues saying they are no longer |
+ # blocking this issue. |
+ for remove_issue in blocked_remove_issues: |
+ self.CreateIssueComment( |
+ cnxn, remove_issue.project_id, remove_issue.local_id, reporter_id, |
+ content='', |
+ amendments=[tracker_bizobj.MakeBlockingAmendment( |
+ [], [(issue.project_name, issue.local_id)], |
+ default_project_name=remove_issue.project_name)]) |
+ |
+ # Add a comment to the newly added issues saying they are now blocked on |
+ # this issue. |
+ for add_issue in blocking_add_issues: |
+ self.CreateIssueComment( |
+ cnxn, add_issue.project_id, add_issue.local_id, reporter_id, |
+ content='', |
+ amendments=[tracker_bizobj.MakeBlockedOnAmendment( |
+ [(issue.project_name, issue.local_id)], [], |
+ default_project_name=add_issue.project_name)]) |
+ # Add a comment to the newly removed issues saying they are no longer |
+ # blocked on this issue. |
+ for remove_issue in blocking_remove_issues: |
+ self.CreateIssueComment( |
+ cnxn, remove_issue.project_id, remove_issue.local_id, reporter_id, |
+ content='', |
+ amendments=[tracker_bizobj.MakeBlockedOnAmendment( |
+ [], [(issue.project_name, issue.local_id)], |
+ default_project_name=remove_issue.project_name)]) |
+ |
+ self._UpdateIssuesModified( |
+ cnxn, iids_to_invalidate, modified_timestamp=issue.modified_timestamp) |
+ |
+ if index_now: |
+ tracker_fulltext.IndexIssues( |
+ cnxn, [issue], services.user, self, self._config_service) |
+ |
+ if is_spam: |
+ sequence_num = len(self.GetCommentsForIssue(cnxn, issue.issue_id)) - 1 |
+ # Soft-deletes have to have a user ID, so spam comments are |
+ # just "deleted" by the commenter. |
+ self.SoftDeleteComment(cnxn, project_id, local_id, sequence_num, |
+ reporter_id, services.user, is_spam=True) |
+ return amendments, comment_pb |
+ |
+ def RelateIssues(self, cnxn, issue_relation_dict, commit=True): |
+ """Update the IssueRelation table rows for the given relationships. |
+ |
+ issue_relation_dict is a mapping of 'source' issues to 'destination' issues, |
+ paired with the kind of relationship connecting the two. |
+ """ |
+ relation_rows = [] |
+ for src_iid, dests in issue_relation_dict.iteritems(): |
+ for dst_iid, kind in dests: |
+ if kind == 'blocking': |
+ relation_rows.append((dst_iid, src_iid, 'blockedon')) |
+ elif kind == 'blockedon' or kind == 'mergedinto': |
+ relation_rows.append((src_iid, dst_iid, kind)) |
+ |
+ self.issuerelation_tbl.InsertRows( |
+ cnxn, ISSUERELATION_COLS, relation_rows, ignore=True, commit=commit) |
+ |
+ def CopyIssues(self, cnxn, dest_project, issues, user_service, copier_id): |
+ """Copy the given issues into the destination project.""" |
+ created_issues = [] |
+ iids_to_invalidate = set() |
+ |
+ for target_issue in issues: |
+ new_issue = tracker_pb2.Issue() |
+ new_issue.project_id = dest_project.project_id |
+ new_issue.project_name = dest_project.project_name |
+ new_issue.summary = target_issue.summary |
+ new_issue.labels.extend(target_issue.labels) |
+ new_issue.field_values.extend(target_issue.field_values) |
+ new_issue.reporter_id = copier_id |
+ |
+ timestamp = int(time.time()) |
+ new_issue.opened_timestamp = timestamp |
+ new_issue.modified_timestamp = timestamp |
+ |
+ target_comments = self.GetCommentsForIssue(cnxn, target_issue.issue_id) |
+ initial_summary_comment = target_comments[0] |
+ |
+ # Note that blocking and merge_into are not copied. |
+ if target_issue.blocked_on_iids: |
+ blocked_on = target_issue.blocked_on_iids |
+ iids_to_invalidate.update(blocked_on) |
+ new_issue.blocked_on_iids = blocked_on |
+ |
+ # Gather list of attachments from the target issue's summary comment. |
+ # MakeIssueComments expects a list of [(filename, contents, mimetype),...] |
+ attachments = [] |
+ for attachment in initial_summary_comment.attachments: |
+ object_path = ('/' + app_identity.get_default_gcs_bucket_name() + |
+ attachment.gcs_object_id) |
+ with cloudstorage.open(object_path, 'r') as f: |
+ content = f.read() |
+ attachments.append( |
+ [attachment.filename, content, attachment.mimetype]) |
+ |
+ if attachments: |
+ new_issue.attachment_count = len(attachments) |
+ |
+ # Create the same summary comment as the target issue. |
+ comment = self._MakeIssueComment( |
+ dest_project.project_id, copier_id, initial_summary_comment.content, |
+ attachments=attachments, timestamp=timestamp, was_escaped=True) |
+ |
+ new_issue.local_id = self.AllocateNextLocalID( |
+ cnxn, dest_project.project_id) |
+ issue_id = self.InsertIssue(cnxn, new_issue) |
+ comment.issue_id = issue_id |
+ self.InsertComment(cnxn, comment) |
+ |
+ if permissions.HasRestrictions(new_issue, 'view'): |
+ self._config_service.InvalidateMemcache( |
+ [new_issue], key_prefix='nonviewable:') |
+ |
+ tracker_fulltext.IndexIssues( |
+ cnxn, [new_issue], user_service, self, self._config_service) |
+ created_issues.append(new_issue) |
+ |
+ # The referenced issues are all modified when the relationship is added. |
+ self._UpdateIssuesModified( |
+ cnxn, iids_to_invalidate, modified_timestamp=timestamp) |
+ |
+ return created_issues |
+ |
+ def MoveIssues(self, cnxn, dest_project, issues, user_service): |
+ """Move the given issues into the destination project.""" |
+ old_location_rows = [ |
+ (issue.issue_id, issue.project_id, issue.local_id) |
+ for issue in issues] |
+ moved_back_iids = set() |
+ |
+ former_locations_in_project = self.issueformerlocations_tbl.Select( |
+ cnxn, cols=ISSUEFORMERLOCATIONS_COLS, |
+ project_id=dest_project.project_id, |
+ issue_id=[issue.issue_id for issue in issues]) |
+ former_locations = { |
+ issue_id: local_id |
+ for issue_id, project_id, local_id in former_locations_in_project} |
+ |
+ # Remove the issue id from issue_id_2lc so that it does not stay |
+ # around in cache and memcache. |
+ # The Key of IssueIDTwoLevelCache is (project_id, local_id). |
+ issue_id_2lc_key = (issues[0].project_id, issues[0].local_id) |
+ self.issue_id_2lc.InvalidateKeys(cnxn, [issue_id_2lc_key]) |
+ |
+ for issue in issues: |
+ if issue.issue_id in former_locations: |
+ dest_id = former_locations[issue.issue_id] |
+ moved_back_iids.add(issue.issue_id) |
+ else: |
+ dest_id = self.AllocateNextLocalID(cnxn, dest_project.project_id) |
+ |
+ issue.local_id = dest_id |
+ issue.project_id = dest_project.project_id |
+ issue.project_name = dest_project.project_name |
+ |
+ # Rewrite each whole issue so that status and label IDs are looked up |
+ # in the context of the destination project. |
+ self.UpdateIssues(cnxn, issues) |
+ |
+ # Comments also have the project_id because it is needed for an index. |
+ self.comment_tbl.Update( |
+ cnxn, {'project_id': dest_project.project_id}, |
+ issue_id=[issue.issue_id for issue in issues], commit=False) |
+ |
+ # Record old locations so that we can offer links if the user looks there. |
+ self.issueformerlocations_tbl.InsertRows( |
+ cnxn, ISSUEFORMERLOCATIONS_COLS, old_location_rows, ignore=True, |
+ commit=False) |
+ cnxn.Commit() |
+ |
+ tracker_fulltext.IndexIssues( |
+ cnxn, issues, user_service, self, self._config_service) |
+ |
+ return moved_back_iids |
+ |
+ def ExpungeFormerLocations(self, cnxn, project_id): |
+ """Delete history of issues that were in this project but moved out.""" |
+ self.issueformerlocations_tbl.Delete(cnxn, project_id=project_id) |
+ |
+ def ExpungeIssues(self, cnxn, issue_ids): |
+ """Completely delete the specified issues from the database.""" |
+ logging.info('expunging the issues %r', issue_ids) |
+ tracker_fulltext.UnindexIssues(issue_ids) |
+ |
+ remaining_iids = issue_ids[:] |
+ |
+ # Note: these are purposely not done in a transaction to allow |
+ # incremental progress in what might be a very large change. |
+ # We are not concerned about non-atomic deletes because all |
+ # this data will be gone eventually anyway. |
+ while remaining_iids: |
+ iids_in_chunk = remaining_iids[:CHUNK_SIZE] |
+ remaining_iids = remaining_iids[CHUNK_SIZE:] |
+ self.issuesummary_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
+ self.issue2label_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
+ self.issue2component_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
+ self.issue2cc_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
+ self.issue2notify_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
+ self.issueupdate_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
+ self.attachment_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
+ self.comment_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
+ self.issuerelation_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
+ self.issuerelation_tbl.Delete(cnxn, dst_issue_id=iids_in_chunk) |
+ self.danglingrelation_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
+ self.issueformerlocations_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
+ self.reindexqueue_tbl.Delete(cnxn, issue_id=iids_in_chunk) |
+ self.issue_tbl.Delete(cnxn, id=iids_in_chunk) |
+ |
+ def SoftDeleteIssue(self, cnxn, project_id, local_id, deleted, user_service): |
+ """Set the deleted boolean on the indicated issue and store it. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ project_id: int project ID for the current project. |
+ local_id: int local ID of the issue to freeze/unfreeze. |
+ deleted: boolean, True to soft-delete, False to undelete. |
+ user_service: persistence layer for users, used to lookup user IDs. |
+ """ |
+ issue = self.GetIssueByLocalID(cnxn, project_id, local_id) |
+ issue.deleted = deleted |
+ self.UpdateIssue(cnxn, issue, update_cols=['deleted']) |
+ tracker_fulltext.IndexIssues( |
+ cnxn, [issue], user_service, self, self._config_service) |
+ |
+ def DeleteComponentReferences(self, cnxn, component_id): |
+ """Delete any references to the specified component.""" |
+ # TODO(jrobbins): add tasks to re-index any affected issues. |
+ # Note: if this call fails, some data could be left |
+ # behind, but it would not be displayed, and it could always be |
+ # GC'd from the DB later. |
+ self.issue2component_tbl.Delete(cnxn, component_id=component_id) |
+ |
+ ### Local ID generation |
+ |
+ def InitializeLocalID(self, cnxn, project_id): |
+ """Initialize the local ID counter for the specified project to zero. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ project_id: int ID of the project. |
+ """ |
+ self.localidcounter_tbl.InsertRow( |
+ cnxn, project_id=project_id, used_local_id=0, used_spam_id=0) |
+ |
+ def SetUsedLocalID(self, cnxn, project_id): |
+ """Set the local ID counter based on existing issues. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ project_id: int ID of the project. |
+ """ |
+ highest_id = self.GetHighestLocalID(cnxn, project_id) |
+ self.localidcounter_tbl.Update( |
+ cnxn, {'used_local_id': highest_id}, project_id=project_id) |
+ return highest_id |
+ |
+ def AllocateNextLocalID(self, cnxn, project_id): |
+ """Return the next available issue ID in the specified project. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ project_id: int ID of the project. |
+ |
+ Returns: |
+ The next local ID. |
+ """ |
+ try: |
+ next_local_id = self.localidcounter_tbl.IncrementCounterValue( |
+ cnxn, 'used_local_id', project_id=project_id) |
+ except AssertionError: |
+ next_local_id = self.SetUsedLocalID(cnxn, project_id) + 1 |
+ return next_local_id |
+ |
+ def SetUsedSpamID(self, cnxn, project_id): |
+ """Set the local ID counter based on existing issues. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ project_id: int ID of the project. |
+ """ |
+ current_id = self.localidcounter_tbl.SelectValue( |
+ cnxn, 'used_spam_id', project_id=project_id) |
+ current_id = current_id or 0 # Will be None if project has no issues. |
+ |
+ self.localidcounter_tbl.Update( |
+ cnxn, {'used_spam_id': current_id + 1}, project_id=project_id) |
+ return current_id + 1 |
+ |
+ def AllocateNextSpamLocalID(self, cnxn, project_id): |
+ """Return the next available spam issue ID in the specified project. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ project_id: int ID of the project. |
+ |
+ Returns: |
+ The next local ID. |
+ """ |
+ try: |
+ next_spam_id = self.localidcounter_tbl.IncrementCounterValue( |
+ cnxn, 'used_spam_id', project_id=project_id) |
+ except AssertionError: |
+ next_spam_id = self.SetUsedSpamID(cnxn, project_id) + 1 |
+ return -next_spam_id |
+ |
+ def GetHighestLocalID(self, cnxn, project_id): |
+ """Return the highest used issue ID in the specified project. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ project_id: int ID of the project. |
+ |
+ Returns: |
+ The highest local ID for an active or moved issues. |
+ """ |
+ highest = self.issue_tbl.SelectValue( |
+ cnxn, 'MAX(local_id)', project_id=project_id) |
+ highest = highest or 0 # It will be None if the project has no issues. |
+ highest_former = self.issueformerlocations_tbl.SelectValue( |
+ cnxn, 'MAX(local_id)', project_id=project_id) |
+ highest_former = highest_former or 0 |
+ return max(highest, highest_former) |
+ |
+ def GetAllLocalIDsInProject(self, cnxn, project_id, min_local_id=None): |
+ """Return the list of local IDs only, not the actual issues. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ project_id: the ID of the project to which the issue belongs. |
+ min_local_id: point to start at. |
+ |
+ Returns: |
+ A range object of local IDs from 1 to N, or from min_local_id to N. It |
+ may be the case that some of those local IDs are no longer used, e.g., |
+ if some issues were moved out of this project. |
+ """ |
+ if not min_local_id: |
+ min_local_id = 1 |
+ highest_local_id = self.GetHighestLocalID(cnxn, project_id) |
+ return range(min_local_id, highest_local_id + 1) |
+ |
+ def ExpungeLocalIDCounters(self, cnxn, project_id): |
+ """Delete history of local ids that were in this project.""" |
+ self.localidcounter_tbl.Delete(cnxn, project_id=project_id) |
+ |
+ ### Comments |
+ |
+ def _UnpackComment(self, comment_row): |
+ """Partially construct a Comment PB from a DB row.""" |
+ (comment_id, issue_id, created, project_id, commenter_id, content, |
+ inbound_message, was_escaped, deleted_by, is_spam) = comment_row |
+ comment = tracker_pb2.IssueComment() |
+ comment.id = comment_id |
+ comment.issue_id = issue_id |
+ comment.timestamp = created |
+ comment.project_id = project_id |
+ comment.user_id = commenter_id |
+ comment.content = content or '' |
+ comment.inbound_message = inbound_message or '' |
+ comment.was_escaped = bool(was_escaped) |
+ comment.deleted_by = deleted_by or 0 |
+ comment.is_spam = bool(is_spam) |
+ return comment |
+ |
+ def _UnpackAmendment(self, amendment_row): |
+ """Construct an Amendment PB from a DB row.""" |
+ (_id, _issue_id, comment_id, field_name, |
+ old_value, new_value, added_user_id, removed_user_id, |
+ custom_field_name) = amendment_row |
+ amendment = tracker_pb2.Amendment() |
+ field_enum = tracker_pb2.FieldID(field_name.upper()) |
+ amendment.field = field_enum |
+ |
+ # TODO(jrobbins): display old values in more cases. |
+ if new_value is not None: |
+ amendment.newvalue = new_value |
+ if old_value is not None: |
+ amendment.oldvalue = old_value |
+ if added_user_id: |
+ amendment.added_user_ids.append(added_user_id) |
+ if removed_user_id: |
+ amendment.removed_user_ids.append(removed_user_id) |
+ if custom_field_name: |
+ amendment.custom_field_name = custom_field_name |
+ return amendment, comment_id |
+ |
+ def _ConsolidateAmendments(self, amendments): |
+ """Consoliodate amendments of the same field in one comment into one |
+ amendment PB.""" |
+ |
+ fields_dict = {} |
+ result = [] |
+ |
+ for amendment in amendments: |
+ fields_dict.setdefault(amendment.field, []).append(amendment) |
+ for field, amendments in fields_dict.iteritems(): |
+ new_amendment = tracker_pb2.Amendment() |
+ new_amendment.field = field |
+ for amendment in amendments: |
+ if amendment.newvalue is not None: |
+ new_amendment.newvalue = amendment.newvalue |
+ if amendment.oldvalue is not None: |
+ new_amendment.oldvalue = amendment.oldvalue |
+ if amendment.added_user_ids: |
+ new_amendment.added_user_ids.extend(amendment.added_user_ids) |
+ if amendment.removed_user_ids: |
+ new_amendment.removed_user_ids.extend(amendment.removed_user_ids) |
+ if amendment.custom_field_name: |
+ new_amendment.custom_field_name = amendment.custom_field_name |
+ result.append(new_amendment) |
+ return result |
+ |
+ def _UnpackAttachment(self, attachment_row): |
+ """Construct an Attachment PB from a DB row.""" |
+ (attachment_id, _issue_id, comment_id, filename, filesize, mimetype, |
+ deleted, gcs_object_id) = attachment_row |
+ attach = tracker_pb2.Attachment() |
+ attach.attachment_id = attachment_id |
+ attach.filename = filename |
+ attach.filesize = filesize |
+ attach.mimetype = mimetype |
+ attach.deleted = bool(deleted) |
+ attach.gcs_object_id = gcs_object_id |
+ return attach, comment_id |
+ |
+ def _DeserializeComments( |
+ self, comment_rows, amendment_rows, attachment_rows): |
+ """Turn rows into IssueComment PBs.""" |
+ results = [] # keep objects in the same order as the rows |
+ results_dict = {} # for fast access when joining. |
+ |
+ for comment_row in comment_rows: |
+ comment = self._UnpackComment(comment_row) |
+ results.append(comment) |
+ results_dict[comment.id] = comment |
+ |
+ for amendment_row in amendment_rows: |
+ amendment, comment_id = self._UnpackAmendment(amendment_row) |
+ try: |
+ results_dict[comment_id].amendments.extend([amendment]) |
+ except KeyError: |
+ logging.error('Found amendment for missing comment: %r', comment_id) |
+ |
+ for attachment_row in attachment_rows: |
+ attach, comment_id = self._UnpackAttachment(attachment_row) |
+ try: |
+ results_dict[comment_id].attachments.append(attach) |
+ except KeyError: |
+ logging.error('Found attachment for missing comment: %r', comment_id) |
+ |
+ for c in results: |
+ c.amendments = self._ConsolidateAmendments(c.amendments) |
+ |
+ return results |
+ |
+ # TODO(jrobbins): make this a private method and expose just the interface |
+ # needed by activities.py. |
+ def GetComments(self, cnxn, where=None, order_by=None, **kwargs): |
+ """Retrieve comments from SQL.""" |
+ # Explicitly specify column Comment.id to allow joins on other tables that |
+ # have an id column. |
+ order_by = order_by or [('created', [])] |
+ comment_rows = self.comment_tbl.Select( |
+ cnxn, cols=COMMENT_COLS, where=where, |
+ order_by=order_by, **kwargs) |
+ cids = [row[0] for row in comment_rows] |
+ amendment_rows = self.issueupdate_tbl.Select( |
+ cnxn, cols=ISSUEUPDATE_COLS, comment_id=cids) |
+ attachment_rows = self.attachment_tbl.Select( |
+ cnxn, cols=ATTACHMENT_COLS, comment_id=cids) |
+ |
+ comments = self._DeserializeComments( |
+ comment_rows, amendment_rows, attachment_rows) |
+ return comments |
+ |
+ def GetComment(self, cnxn, comment_id): |
+ """Get the requested comment, or raise an exception.""" |
+ comments = self.GetComments(cnxn, id=comment_id) |
+ try: |
+ return comments[0] |
+ except IndexError: |
+ raise NoSuchCommentException() |
+ |
+ def GetCommentsForIssue(self, cnxn, issue_id): |
+ """Return all IssueComment PBs for the specified issue. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ issue_id: int global ID of the issue. |
+ |
+ Returns: |
+ A list of the IssueComment protocol buffers for the description |
+ and comments on this issue. |
+ """ |
+ comments = self.GetComments(cnxn, issue_id=[issue_id]) |
+ for i, comment in enumerate(comments): |
+ comment.sequence = i |
+ |
+ return comments |
+ |
+ def GetCommentsByID(self, cnxn, comment_ids, sequences): |
+ """Return all IssueComment PBs by comment ids. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ comment_ids: a list of comment ids. |
+ sequences: sequence of the comments. |
+ |
+ Returns: |
+ A list of the IssueComment protocol buffers for the description |
+ and comments on this issue. |
+ """ |
+ order_by = [('created ASC', [])] |
+ comment_rows = self.comment_tbl.Select( |
+ cnxn, cols=COMMENT_COLS, order_by=order_by, id=comment_ids) |
+ amendment_rows = self.issueupdate_tbl.Select( |
+ cnxn, cols=ISSUEUPDATE_COLS, comment_id=comment_ids) |
+ attachment_rows = self.attachment_tbl.Select( |
+ cnxn, cols=ATTACHMENT_COLS, comment_id=comment_ids) |
+ |
+ comments = self._DeserializeComments( |
+ comment_rows, amendment_rows, attachment_rows) |
+ |
+ for i in xrange(len(comment_ids)): |
+ comments[i].sequence = sequences[i] |
+ |
+ return comments |
+ |
+ def GetAbbrCommentsForIssue(self, cnxn, issue_id): |
+ """Get all abbreviated comments for the specified issue.""" |
+ order_by = [('created ASC', [])] |
+ comment_rows = self.comment_tbl.Select( |
+ cnxn, cols=ABBR_COMMENT_COLS, issue_id=[issue_id], order_by=order_by) |
+ |
+ return comment_rows |
+ |
+ # TODO(jrobbins): remove this message because it is too slow when an issue |
+ # has a huge number of comments. |
+ def GetCommentsForIssues(self, cnxn, issue_ids): |
+ """Return all IssueComment PBs for each issue ID in the given list. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ issue_ids: list of integer global issue IDs. |
+ |
+ Returns: |
+ Dict {issue_id: [IssueComment, ...]} with IssueComment protocol |
+ buffers for the description and comments on each issue. |
+ """ |
+ comments = self.GetComments(cnxn, issue_id=issue_ids) |
+ |
+ comments_dict = collections.defaultdict(list) |
+ for comment in comments: |
+ comment.sequence = len(comments_dict[comment.issue_id]) |
+ comments_dict[comment.issue_id].append(comment) |
+ |
+ return comments_dict |
+ |
+ def InsertComment(self, cnxn, comment, commit=True): |
+ """Store the given issue comment in SQL. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ comment: IssueComment PB to insert into the database. |
+ commit: set to False to avoid doing the commit for now. |
+ """ |
+ comment_id = self.comment_tbl.InsertRow( |
+ cnxn, issue_id=comment.issue_id, created=comment.timestamp, |
+ project_id=comment.project_id, |
+ commenter_id=comment.user_id, content=comment.content, |
+ inbound_message=comment.inbound_message, |
+ was_escaped=comment.was_escaped, |
+ deleted_by=comment.deleted_by or None, |
+ is_spam=comment.is_spam, |
+ commit=commit) |
+ comment.id = comment_id |
+ |
+ amendment_rows = [] |
+ for amendment in comment.amendments: |
+ field_enum = str(amendment.field).lower() |
+ if (amendment.get_assigned_value('newvalue') is not None and |
+ not amendment.added_user_ids and not amendment.removed_user_ids): |
+ amendment_rows.append(( |
+ comment.issue_id, comment_id, field_enum, |
+ amendment.oldvalue, amendment.newvalue, |
+ None, None, amendment.custom_field_name)) |
+ for added_user_id in amendment.added_user_ids: |
+ amendment_rows.append(( |
+ comment.issue_id, comment_id, field_enum, None, None, |
+ added_user_id, None, amendment.custom_field_name)) |
+ for removed_user_id in amendment.removed_user_ids: |
+ amendment_rows.append(( |
+ comment.issue_id, comment_id, field_enum, None, None, |
+ None, removed_user_id, amendment.custom_field_name)) |
+ # ISSUEUPDATE_COLS[1:] to skip id column. |
+ self.issueupdate_tbl.InsertRows( |
+ cnxn, ISSUEUPDATE_COLS[1:], amendment_rows, commit=commit) |
+ |
+ attachment_rows = [] |
+ for attach in comment.attachments: |
+ attachment_rows.append([ |
+ comment.issue_id, comment.id, attach.filename, attach.filesize, |
+ attach.mimetype, attach.deleted, attach.gcs_object_id]) |
+ self.attachment_tbl.InsertRows( |
+ cnxn, ATTACHMENT_COLS[1:], attachment_rows, commit=commit) |
+ |
+ def _UpdateComment(self, cnxn, comment, update_cols=None): |
+ """Update the given issue comment in SQL. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ comment: IssueComment PB to update in the database. |
+ update_cols: optional list of just the field names to update. |
+ """ |
+ delta = { |
+ 'commenter_id': comment.user_id, |
+ 'content': comment.content, |
+ 'deleted_by': comment.deleted_by or None, |
+ 'is_spam': comment.is_spam, |
+ } |
+ if update_cols is not None: |
+ delta = {key: val for key, val in delta.iteritems() |
+ if key in update_cols} |
+ |
+ self.comment_tbl.Update(cnxn, delta, id=comment.id) |
+ |
+ def _MakeIssueComment( |
+ self, project_id, user_id, content, inbound_message=None, |
+ amendments=None, attachments=None, timestamp=None, was_escaped=False, |
+ is_spam=False): |
+ """Create in IssueComment protocol buffer in RAM. |
+ |
+ Args: |
+ project_id: Project with the issue. |
+ user_id: the user ID of the user who entered the comment. |
+ content: string body of the comment. |
+ inbound_message: optional string full text of an email that |
+ caused this comment to be added. |
+ amendments: list of Amendment PBs describing the |
+ metadata changes that the user made along w/ comment. |
+ attachments: [(filename, contents, mimetype),...] attachments uploaded at |
+ the time the comment was made. |
+ timestamp: time at which the comment was made, defaults to now. |
+ was_escaped: True if the comment was HTML escaped already. |
+ is_spam: True if the comment was classified as spam. |
+ Returns: |
+ The new IssueComment protocol buffer. |
+ |
+ The content may have some markup done during input processing. |
+ |
+ Any attachments are immediately stored. |
+ """ |
+ comment = tracker_pb2.IssueComment() |
+ comment.project_id = project_id |
+ comment.user_id = user_id |
+ comment.content = content or '' |
+ comment.was_escaped = was_escaped |
+ comment.is_spam = is_spam |
+ if not timestamp: |
+ timestamp = int(time.time()) |
+ comment.timestamp = int(timestamp) |
+ if inbound_message: |
+ comment.inbound_message = inbound_message |
+ if amendments: |
+ logging.info('amendments is %r', amendments) |
+ comment.amendments.extend(amendments) |
+ |
+ if attachments: |
+ for filename, body, mimetype in attachments: |
+ gcs_object_id = gcs_helpers.StoreObjectInGCS(body, mimetype, project_id) |
+ attach = tracker_pb2.Attachment() |
+ # attachment id is determined later by the SQL DB. |
+ attach.filename = filename |
+ attach.filesize = len(body) |
+ attach.mimetype = mimetype |
+ attach.gcs_object_id = gcs_object_id |
+ comment.attachments.extend([attach]) |
+ logging.info("Save attachment with object_id: %s" % gcs_object_id) |
+ |
+ return comment |
+ |
+ def CreateIssueComment( |
+ self, cnxn, project_id, local_id, user_id, content, inbound_message=None, |
+ amendments=None, attachments=None, timestamp=None, is_spam=False, |
+ commit=True): |
+ """Create and store a new comment on the specified issue. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ project_id: int ID of the current Project. |
+ local_id: the issue on which to add the comment. |
+ user_id: the user ID of the user who entered the comment. |
+ content: string body of the comment. |
+ inbound_message: optional string full text of an email that caused |
+ this comment to be added. |
+ amendments: list of Amendment PBs describing the |
+ metadata changes that the user made along w/ comment. |
+ attachments: [(filename, contents, mimetype),...] attachments uploaded at |
+ the time the comment was made. |
+ timestamp: time at which the comment was made, defaults to now. |
+ is_spam: True if the comment is classified as spam. |
+ commit: set to False to not commit to DB yet. |
+ |
+ Returns: |
+ The new IssueComment protocol buffer. |
+ |
+ Note that we assume that the content is safe to echo out |
+ again. The content may have some markup done during input |
+ processing. |
+ """ |
+ issue = self.GetIssueByLocalID(cnxn, project_id, local_id) |
+ |
+ comment = self._MakeIssueComment( |
+ issue.project_id, user_id, content, amendments=amendments, |
+ inbound_message=inbound_message, attachments=attachments, |
+ timestamp=timestamp, is_spam=is_spam) |
+ comment.issue_id = issue.issue_id |
+ |
+ if attachments: |
+ issue.attachment_count = issue.attachment_count + len(attachments) |
+ self.UpdateIssue(cnxn, issue, update_cols=['attachment_count']) |
+ |
+ self.InsertComment(cnxn, comment, commit=commit) |
+ |
+ return comment |
+ |
+ def SoftDeleteComment( |
+ self, cnxn, project_id, local_id, sequence_num, deleted_by_user_id, |
+ user_service, delete=True, reindex=True, is_spam=False): |
+ """Mark comment as un/deleted, which shows/hides it from average users.""" |
+ issue = self.GetIssueByLocalID(cnxn, project_id, local_id) |
+ |
+ all_comments = self.GetCommentsForIssue(cnxn, issue.issue_id) |
+ try: |
+ issue_comment = all_comments[sequence_num] |
+ except IndexError: |
+ logging.warning( |
+ 'Tried to (un)delete non-existent comment #%s in issue %s:%s', |
+ sequence_num, project_id, local_id) |
+ return |
+ |
+ # Update number of attachments |
+ attachments = 0 |
+ if issue_comment.attachments: |
+ for attachment in issue_comment.attachments: |
+ if not attachment.deleted: |
+ attachments += 1 |
+ |
+ # Delete only if it's not in deleted state |
+ if delete: |
+ if not issue_comment.deleted_by: |
+ issue_comment.deleted_by = deleted_by_user_id |
+ issue.attachment_count = issue.attachment_count - attachments |
+ |
+ # Undelete only if it's in deleted state |
+ elif issue_comment.deleted_by: |
+ issue_comment.deleted_by = 0 |
+ issue.attachment_count = issue.attachment_count + attachments |
+ |
+ issue_comment.is_spam = is_spam |
+ self._UpdateComment( |
+ cnxn, issue_comment, update_cols=['deleted_by', 'is_spam']) |
+ self.UpdateIssue(cnxn, issue, update_cols=['attachment_count']) |
+ |
+ # Reindex the issue to take the comment deletion/undeletion into account. |
+ if reindex: |
+ tracker_fulltext.IndexIssues( |
+ cnxn, [issue], user_service, self, self._config_service) |
+ |
+ ### Attachments |
+ |
+ def GetAttachmentAndContext(self, cnxn, attachment_id): |
+ """Load a IssueAttachment from database, and its comment ID and IID. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ attachment_id: long integer unique ID of desired issue attachment. |
+ |
+ Returns: |
+ An Attachment protocol buffer that contains metadata about the attached |
+ file, or None if it doesn't exist. Also, the comment ID and issue IID |
+ of the comment and issue that contain this attachment. |
+ |
+ Raises: |
+ NoSuchAttachmentException: the attachment was not found. |
+ """ |
+ if attachment_id is None: |
+ raise NoSuchAttachmentException() |
+ |
+ attachment_row = self.attachment_tbl.SelectRow( |
+ cnxn, cols=ATTACHMENT_COLS, id=attachment_id) |
+ if attachment_row: |
+ (attach_id, issue_id, comment_id, filename, filesize, mimetype, |
+ deleted, gcs_object_id) = attachment_row |
+ if not deleted: |
+ attachment = tracker_pb2.Attachment( |
+ attachment_id=attach_id, filename=filename, filesize=filesize, |
+ mimetype=mimetype, deleted=bool(deleted), |
+ gcs_object_id=gcs_object_id) |
+ return attachment, comment_id, issue_id |
+ |
+ raise NoSuchAttachmentException() |
+ |
+ def _UpdateAttachment(self, cnxn, attach, update_cols=None): |
+ """Update attachment metadata in the DB. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ attach: IssueAttachment PB to update in the DB. |
+ update_cols: optional list of just the field names to update. |
+ """ |
+ delta = { |
+ 'filename': attach.filename, |
+ 'filesize': attach.filesize, |
+ 'mimetype': attach.mimetype, |
+ 'deleted': bool(attach.deleted), |
+ } |
+ if update_cols is not None: |
+ delta = {key: val for key, val in delta.iteritems() |
+ if key in update_cols} |
+ |
+ self.attachment_tbl.Update(cnxn, delta, id=attach.attachment_id) |
+ |
+ def SoftDeleteAttachment( |
+ self, cnxn, project_id, local_id, seq_num, attach_id, user_service, |
+ delete=True, index_now=True): |
+ """Mark attachment as un/deleted, which shows/hides it from avg users.""" |
+ issue = self.GetIssueByLocalID(cnxn, project_id, local_id) |
+ all_comments = self.GetCommentsForIssue(cnxn, issue.issue_id) |
+ try: |
+ issue_comment = all_comments[seq_num] |
+ except IndexError: |
+ logging.warning( |
+ 'Tried to (un)delete attachment on non-existent comment #%s in ' |
+ 'issue %s:%s', seq_num, project_id, local_id) |
+ return |
+ |
+ attachment = None |
+ for attach in issue_comment.attachments: |
+ if attach.attachment_id == attach_id: |
+ attachment = attach |
+ |
+ if not attachment: |
+ logging.warning( |
+ 'Tried to (un)delete non-existent attachment #%s in project ' |
+ '%s issue %s', attach_id, project_id, local_id) |
+ return |
+ |
+ if not issue_comment.deleted_by: |
+ # Decrement attachment count only if it's not in deleted state |
+ if delete: |
+ if not attachment.deleted: |
+ issue.attachment_count = issue.attachment_count - 1 |
+ |
+ # Increment attachment count only if it's in deleted state |
+ elif attachment.deleted: |
+ issue.attachment_count = issue.attachment_count + 1 |
+ |
+ attachment.deleted = delete |
+ |
+ self._UpdateAttachment(cnxn, attachment, update_cols=['deleted']) |
+ self.UpdateIssue(cnxn, issue, update_cols=['attachment_count']) |
+ |
+ if index_now: |
+ tracker_fulltext.IndexIssues( |
+ cnxn, [issue], user_service, self, self._config_service) |
+ |
+ ### Reindex queue |
+ |
+ def EnqueueIssuesForIndexing(self, cnxn, issue_ids): |
+ """Add the given issue IDs to the ReindexQueue table.""" |
+ reindex_rows = [(issue_id,) for issue_id in issue_ids] |
+ self.reindexqueue_tbl.InsertRows( |
+ cnxn, ['issue_id'], reindex_rows, ignore=True) |
+ |
+ def ReindexIssues(self, cnxn, num_to_reindex, user_service): |
+ """Reindex some issues specified in the IndexQueue table.""" |
+ rows = self.reindexqueue_tbl.Select( |
+ cnxn, order_by=[('created', [])], limit=num_to_reindex) |
+ issue_ids = [row[0] for row in rows] |
+ |
+ if issue_ids: |
+ issues = self.GetIssues(cnxn, issue_ids) |
+ tracker_fulltext.IndexIssues( |
+ cnxn, issues, user_service, self, self._config_service) |
+ self.reindexqueue_tbl.Delete(cnxn, issue_id=issue_ids) |
+ |
+ return len(issue_ids) |
+ |
+ ### Search functions |
+ |
+ def RunIssueQuery( |
+ self, cnxn, left_joins, where, order_by, shard_id=None, limit=None): |
+ """Run a SQL query to find matching issue IDs. |
+ |
+ Args: |
+ cnxn: connection to SQL database. |
+ left_joins: list of SQL LEFT JOIN clauses. |
+ where: list of SQL WHERE clauses. |
+ order_by: list of SQL ORDER BY clauses. |
+ shard_id: int shard ID to focus the search. |
+ limit: int maximum number of results, defaults to |
+ settings.search_limit_per_shard. |
+ |
+ Returns: |
+ (issue_ids, capped) where issue_ids is a list of the result issue IDs, |
+ and capped is True if the number of results reached the limit. |
+ """ |
+ limit = limit or settings.search_limit_per_shard |
+ where = where + [('Issue.deleted = %s', [False])] |
+ rows = self.issue_tbl.Select( |
+ cnxn, shard_id=shard_id, distinct=True, cols=['Issue.id'], |
+ left_joins=left_joins, where=where, order_by=order_by, |
+ limit=limit) |
+ issue_ids = [row[0] for row in rows] |
+ capped = len(issue_ids) >= limit |
+ return issue_ids, capped |
+ |
+ def GetIIDsByLabelIDs(self, cnxn, label_ids, project_id, shard_id): |
+ """Return a list of IIDs for issues with any of the given label IDs.""" |
+ where = [] |
+ if shard_id is not None: |
+ slice_term = ('shard = %s', [shard_id]) |
+ where.append(slice_term) |
+ |
+ rows = self.issue_tbl.Select( |
+ cnxn, shard_id=shard_id, cols=['id'], |
+ left_joins=[('Issue2Label ON Issue.id = Issue2Label.issue_id', [])], |
+ label_id=label_ids, project_id=project_id, where=where) |
+ |
+ return [row[0] for row in rows] |
+ |
+ def GetIIDsByParticipant(self, cnxn, user_ids, project_ids, shard_id): |
+ """Return IIDs for issues where any of the given users participate.""" |
+ iids = [] |
+ where = [] |
+ if shard_id is not None: |
+ where.append(('shard = %s', [shard_id])) |
+ if project_ids: |
+ cond_str = 'Issue.project_id IN (%s)' % sql.PlaceHolders(project_ids) |
+ where.append((cond_str, project_ids)) |
+ |
+ # TODO(jrobbins): Combine these 3 queries into one with ORs. It currently |
+ # is not the bottleneck. |
+ rows = self.issue_tbl.Select( |
+ cnxn, cols=['id'], reporter_id=user_ids, |
+ where=where, shard_id=shard_id) |
+ for row in rows: |
+ iids.append(row[0]) |
+ |
+ rows = self.issue_tbl.Select( |
+ cnxn, cols=['id'], owner_id=user_ids, |
+ where=where, shard_id=shard_id) |
+ for row in rows: |
+ iids.append(row[0]) |
+ |
+ rows = self.issue_tbl.Select( |
+ cnxn, cols=['id'], derived_owner_id=user_ids, |
+ where=where, shard_id=shard_id) |
+ for row in rows: |
+ iids.append(row[0]) |
+ |
+ rows = self.issue_tbl.Select( |
+ cnxn, cols=['id'], |
+ left_joins=[('Issue2Cc ON Issue2Cc.issue_id = Issue.id', [])], |
+ cc_id=user_ids, |
+ where=where + [('cc_id IS NOT NULL', [])], |
+ shard_id=shard_id) |
+ for row in rows: |
+ iids.append(row[0]) |
+ |
+ rows = self.issue_tbl.Select( |
+ cnxn, cols=['Issue.id'], |
+ left_joins=[ |
+ ('Issue2FieldValue ON Issue.id = Issue2FieldValue.issue_id', []), |
+ ('FieldDef ON Issue2FieldValue.field_id = FieldDef.id', [])], |
+ user_id=user_ids, grants_perm='View', |
+ where=where + [('user_id IS NOT NULL', [])], |
+ shard_id=shard_id) |
+ for row in rows: |
+ iids.append(row[0]) |
+ |
+ return iids |
+ |
+ |
+def _UpdateClosedTimestamp(config, issue, old_effective_status): |
+ """Sets or unsets the closed_timestamp based based on status changes. |
+ |
+ If the status is changing from open to closed, the closed_timestamp is set to |
+ the current time. |
+ |
+ If the status is changing form closed to open, the close_timestamp is unset. |
+ |
+ If the status is changing from one closed to another closed, or from one |
+ open to another open, no operations are performed. |
+ |
+ Args: |
+ config: the project configuration |
+ issue: the issue being updated (a protocol buffer) |
+ old_effective_status: the old issue status string. E.g., 'New' |
+ """ |
+ # open -> closed |
+ if (tracker_helpers.MeansOpenInProject(old_effective_status, config) |
+ and not tracker_helpers.MeansOpenInProject( |
+ tracker_bizobj.GetStatus(issue), config)): |
+ |
+ logging.info('setting closed_timestamp on issue: %d', issue.local_id) |
+ |
+ issue.closed_timestamp = int(time.time()) |
+ return |
+ |
+ # closed -> open |
+ if (not tracker_helpers.MeansOpenInProject(old_effective_status, config) |
+ and tracker_helpers.MeansOpenInProject( |
+ tracker_bizobj.GetStatus(issue), config)): |
+ |
+ logging.info('clearing closed_timestamp on issue: %s', issue.local_id) |
+ |
+ issue.reset('closed_timestamp') |
+ return |
+ |
+ |
+class Error(Exception): |
+ """Base exception class for this package.""" |
+ pass |
+ |
+ |
+class NoSuchIssueException(Error): |
+ """The requested issue was not found.""" |
+ pass |
+ |
+ |
+class NoSuchAttachmentException(Error): |
+ """The requested attachment was not found.""" |
+ pass |
+ |
+ |
+class NoSuchCommentException(Error): |
+ """The requested comment was not found.""" |
+ pass |
+ |
+ |
+class MidAirCollisionException(Error): |
+ """The item was updated by another user at the same time.""" |
+ |
+ def __init__(self, name, continue_issue_id): |
+ super(MidAirCollisionException, self).__init__() |
+ self.name = name # human-readable name for the artifact being edited. |
+ self.continue_issue_id = continue_issue_id # ID of issue to start over. |