Index: appengine/monorail/testing/fake.py |
diff --git a/appengine/monorail/testing/fake.py b/appengine/monorail/testing/fake.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..6447078e7e98fa2ae1370c9a9c794680dae756ba |
--- /dev/null |
+++ b/appengine/monorail/testing/fake.py |
@@ -0,0 +1,1531 @@ |
+# Copyright 2016 The Chromium Authors. All rights reserved. |
+# Use of this source code is govered by a BSD-style |
+# license that can be found in the LICENSE file or at |
+# https://developers.google.com/open-source/licenses/bsd |
+ |
+"""Fake object classes that are useful for unit tests.""" |
+ |
+import collections |
+import logging |
+import re |
+ |
+import settings |
+from framework import framework_helpers |
+from framework import monorailrequest |
+from framework import permissions |
+from framework import validate |
+from proto import project_pb2 |
+from proto import tracker_pb2 |
+from proto import user_pb2 |
+from proto import usergroup_pb2 |
+from services import caches |
+from services import issue_svc |
+from services import project_svc |
+from services import user_svc |
+from tracker import tracker_bizobj |
+from tracker import tracker_constants |
+ |
+# Many fakes return partial or constant values, regardless of their arguments. |
+# pylint: disable=unused-argument |
+ |
+BOUNDARY = '-----thisisaboundary' |
+OWNER_ROLE = 'OWNER_ROLE' |
+COMMITTER_ROLE = 'COMMITTER_ROLE' |
+CONTRIBUTOR_ROLE = 'CONTRIBUTOR_ROLE' |
+ |
+ |
+def Project( |
+ project_name='proj', project_id=None, state=project_pb2.ProjectState.LIVE, |
+ access=project_pb2.ProjectAccess.ANYONE, moved_to=None, |
+ cached_content_timestamp=None, |
+ owner_ids=None, committer_ids=None, contributor_ids=None): |
+ """Returns a project protocol buffer with the given attributes.""" |
+ project_id = project_id or hash(project_name) |
+ return project_pb2.MakeProject( |
+ project_name, project_id=project_id, state=state, access=access, |
+ moved_to=moved_to, cached_content_timestamp=cached_content_timestamp, |
+ owner_ids=owner_ids, committer_ids=committer_ids, |
+ contributor_ids=contributor_ids) |
+ |
+ |
+def MakeTestIssue( |
+ project_id, local_id, summary, status, owner_id, labels=None, |
+ derived_labels=None, derived_status=None, merged_into=0, star_count=0, |
+ derived_owner_id=0, issue_id=None, reporter_id=None, opened_timestamp=None, |
+ closed_timestamp=None, modified_timestamp=None, is_spam=False, |
+ component_ids=None, project_name=None, field_values=None): |
+ """Easily make an Issue for testing.""" |
+ issue = tracker_pb2.Issue() |
+ issue.project_id = project_id |
+ issue.project_name = project_name |
+ issue.local_id = local_id |
+ issue.issue_id = issue_id if issue_id else 100000 + local_id |
+ issue.reporter_id = reporter_id if reporter_id else owner_id |
+ issue.summary = summary |
+ issue.status = status |
+ issue.owner_id = owner_id |
+ issue.derived_owner_id = derived_owner_id |
+ issue.star_count = star_count |
+ issue.merged_into = merged_into |
+ issue.is_spam = is_spam |
+ if opened_timestamp: |
+ issue.opened_timestamp = opened_timestamp |
+ if modified_timestamp: |
+ issue.modified_timestamp = modified_timestamp |
+ if closed_timestamp: |
+ issue.closed_timestamp = closed_timestamp |
+ if labels is not None: |
+ if isinstance(labels, basestring): |
+ labels = labels.split() |
+ issue.labels.extend(labels) |
+ if derived_labels is not None: |
+ if isinstance(derived_labels, basestring): |
+ derived_labels = derived_labels.split() |
+ issue.derived_labels.extend(derived_labels) |
+ if derived_status is not None: |
+ issue.derived_status = derived_status |
+ if component_ids is not None: |
+ issue.component_ids = component_ids |
+ if field_values is not None: |
+ issue.field_values = field_values |
+ return issue |
+ |
+ |
+def MakeTestConfig(project_id, labels, statuses): |
+ """Convenient function to make a ProjectIssueConfig object.""" |
+ config = tracker_bizobj.MakeDefaultProjectIssueConfig(project_id) |
+ if isinstance(labels, basestring): |
+ labels = labels.split() |
+ if isinstance(statuses, basestring): |
+ statuses = statuses.split() |
+ config.well_known_labels = [ |
+ tracker_pb2.LabelDef(label=lab) for lab in labels] |
+ config.well_known_statuses = [ |
+ tracker_pb2.StatusDef(status=stat) for stat in statuses] |
+ return config |
+ |
+ |
+class MonorailConnection(object): |
+ """Fake connection to databases for use in tests.""" |
+ |
+ def Commit(self): |
+ pass |
+ |
+ def Close(self): |
+ pass |
+ |
+ |
+class MonorailRequest(monorailrequest.MonorailRequest): |
+ """Subclass of MonorailRequest suitable for testing.""" |
+ |
+ def __init__(self, user_info=None, project=None, perms=None, **kwargs): |
+ """Construct a test MonorailRequest. |
+ |
+ Typically, this is constructed via testing.helpers.GetRequestObjects, |
+ which also causes url parsing and optionally initializes the user, |
+ project, and permissions info. |
+ |
+ Args: |
+ user_info: a dict of user attributes to set on a MonorailRequest object. |
+ For example, "user_id: 5" causes self.auth.user_id=5. |
+ project: the Project pb for this request. |
+ perms: a PermissionSet for this request. |
+ """ |
+ super(MonorailRequest, self).__init__(**kwargs) |
+ |
+ if user_info is not None: |
+ for key in user_info: |
+ setattr(self.auth, key, user_info[key]) |
+ if 'user_id' in user_info: |
+ self.auth.effective_ids = {user_info['user_id']} |
+ |
+ self.perms = perms or permissions.ADMIN_PERMISSIONSET |
+ self.project = project |
+ |
+ |
+class UserGroupService(object): |
+ """Fake UserGroupService class for testing other code.""" |
+ |
+ def __init__(self): |
+ self.group_settings = {} |
+ self.group_members = {} |
+ self.group_addrs = {} |
+ self.role_dict = {} |
+ |
+ def TestAddGroupSettings( |
+ self, group_id, email, who_can_view=None, anyone_can_join=False, |
+ who_can_add=None, external_group_type=None, |
+ last_sync_time=0, friend_projects=None): |
+ """Set up a fake group for testing. |
+ |
+ Args: |
+ group_id: int user ID of the new user group. |
+ email: string email address to identify the user group. |
+ who_can_view: string enum 'owners', 'members', or 'anyone'. |
+ anyone_can_join: optional boolean to allow any users to join the group. |
+ who_can_add: optional list of int user IDs of users who can add |
+ more members to the group. |
+ """ |
+ friend_projects = friend_projects or [] |
+ group_settings = usergroup_pb2.MakeSettings( |
+ who_can_view or 'members', |
+ external_group_type, last_sync_time, friend_projects) |
+ self.group_settings[group_id] = group_settings |
+ self.group_addrs[group_id] = email |
+ # TODO(jrobbins): store the other settings. |
+ |
+ def TestAddMembers(self, group_id, user_ids, role='member'): |
+ self.group_members.setdefault(group_id, []).extend(user_ids) |
+ for user_id in user_ids: |
+ self.role_dict.setdefault(group_id, {})[user_id] = role |
+ |
+ def LookupMemberships(self, _cnxn, user_id): |
+ memberships = { |
+ group_id for group_id, member_ids in self.group_members.iteritems() |
+ if user_id in member_ids} |
+ return memberships |
+ |
+ def DetermineWhichUserIDsAreGroups(self, _cnxn, user_ids): |
+ return [uid for uid in user_ids |
+ if uid in self.group_settings] |
+ |
+ def GetAllUserGroupsInfo(self, cnxn): |
+ infos = [] |
+ for group_id in self.group_settings: |
+ infos.append( |
+ (self.group_addrs[group_id], |
+ len(self.group_members.get(group_id, [])), |
+ self.group_settings[group_id], group_id)) |
+ |
+ return infos |
+ |
+ def GetAllGroupSettings(self, _cnxn, group_ids): |
+ return {gid: self.group_settings[gid] |
+ for gid in group_ids |
+ if gid in self.group_settings} |
+ |
+ def GetGroupSettings(self, cnxn, group_id): |
+ return self.GetAllGroupSettings(cnxn, [group_id]).get(group_id) |
+ |
+ def CreateGroup(self, cnxn, services, email, who_can_view_members, |
+ ext_group_type=None, friend_projects=None): |
+ friend_projects = friend_projects or [] |
+ group_id = services.user.LookupUserID( |
+ cnxn, email, autocreate=True, allowgroups=True) |
+ group_settings = usergroup_pb2.MakeSettings( |
+ who_can_view_members, ext_group_type, 0, friend_projects) |
+ self.UpdateSettings(cnxn, group_id, group_settings) |
+ return group_id |
+ |
+ def DeleteGroups(self, cnxn, group_ids): |
+ member_ids_dict, owner_ids_dict = self.LookupMembers(cnxn, group_ids) |
+ citizens_id_dict = collections.defaultdict(list) |
+ for g_id, user_ids in member_ids_dict.iteritems(): |
+ citizens_id_dict[g_id].extend(user_ids) |
+ for g_id, user_ids in owner_ids_dict.iteritems(): |
+ citizens_id_dict[g_id].extend(user_ids) |
+ for g_id, citizen_ids in citizens_id_dict.iteritems(): |
+ # Remove group members, friend projects and settings |
+ self.RemoveMembers(cnxn, g_id, citizen_ids) |
+ self.group_settings.pop(g_id, None) |
+ |
+ def LookupMembers(self, _cnxn, group_id_list): |
+ members_dict = {} |
+ owners_dict = {} |
+ for gid in group_id_list: |
+ members_dict[gid] = [] |
+ owners_dict[gid] = [] |
+ for mid in self.group_members.get(gid, []): |
+ if self.role_dict.get(gid, {}).get(mid) == 'owner': |
+ owners_dict[gid].append(mid) |
+ elif self.role_dict.get(gid, {}).get(mid) == 'member': |
+ members_dict[gid].append(mid) |
+ return members_dict, owners_dict |
+ |
+ def LookupAllMembers(self, _cnxn, group_id_list): |
+ direct_members, direct_owners = self.LookupMembers( |
+ _cnxn, group_id_list) |
+ members_dict = {} |
+ owners_dict = {} |
+ for gid in group_id_list: |
+ members = direct_members[gid] |
+ owners = direct_owners[gid] |
+ owners_dict[gid] = owners |
+ members_dict[gid] = members |
+ group_ids = set([uid for uid in members + owners |
+ if uid in self.group_settings]) |
+ while group_ids: |
+ indirect_members, indirect_owners = self.LookupMembers( |
+ _cnxn, group_ids) |
+ child_members = set() |
+ child_owners = set() |
+ for _, children in indirect_members.iteritems(): |
+ child_members.update(children) |
+ for _, children in indirect_owners.iteritems(): |
+ child_owners.update(children) |
+ members_dict[gid].extend(list(child_members)) |
+ owners_dict[gid].extend(list(child_owners)) |
+ group_ids = set(self.DetermineWhichUserIDsAreGroups( |
+ _cnxn, list(child_members) + list(child_owners))) |
+ members_dict[gid] = list(set(members_dict[gid])) |
+ return members_dict, owners_dict |
+ |
+ |
+ def RemoveMembers(self, _cnxn, group_id, old_member_ids): |
+ current_member_ids = self.group_members.get(group_id, []) |
+ revised_member_ids = [mid for mid in current_member_ids |
+ if mid not in old_member_ids] |
+ self.group_members[group_id] = revised_member_ids |
+ |
+ def UpdateMembers(self, _cnxn, group_id, member_ids, new_role): |
+ self.RemoveMembers(_cnxn, group_id, member_ids) |
+ self.TestAddMembers(group_id, member_ids, new_role) |
+ |
+ def UpdateSettings(self, _cnxn, group_id, group_settings): |
+ self.group_settings[group_id] = group_settings |
+ |
+ def ExpandAnyUserGroups(self, cnxn, user_ids): |
+ group_ids = set(self.DetermineWhichUserIDsAreGroups(cnxn, user_ids)) |
+ direct_ids = [uid for uid in user_ids if uid not in group_ids] |
+ member_ids_dict, owner_ids_dict = self.LookupAllMembers(cnxn, group_ids) |
+ |
+ indirect_ids = set() |
+ for gid in group_ids: |
+ indirect_ids.update(member_ids_dict[gid]) |
+ indirect_ids.update(owner_ids_dict[gid]) |
+ # It's possible that a user has both direct and indirect memberships of |
+ # one group. In this case, mark the user as direct member only. |
+ indirect_ids = [iid for iid in indirect_ids if iid not in direct_ids] |
+ |
+ return direct_ids, list(indirect_ids) |
+ |
+ def LookupVisibleMembers( |
+ self, cnxn, group_id_list, perms, effective_ids, services): |
+ settings_dict = self.GetAllGroupSettings(cnxn, group_id_list) |
+ group_ids = settings_dict.keys() |
+ |
+ direct_member_ids_dict, direct_owner_ids_dict = self.LookupMembers( |
+ cnxn, group_ids) |
+ all_member_ids_dict, all_owner_ids_dict = self.LookupAllMembers( |
+ cnxn, group_ids) |
+ visible_member_ids_dict = {} |
+ visible_owner_ids_dict = {} |
+ for gid in group_ids: |
+ member_ids = all_member_ids_dict[gid] |
+ owner_ids = all_owner_ids_dict[gid] |
+ if permissions.CanViewGroup(perms, effective_ids, settings_dict[gid], |
+ member_ids, owner_ids, []): |
+ visible_member_ids_dict[gid] = direct_member_ids_dict[gid] |
+ visible_owner_ids_dict[gid] = direct_owner_ids_dict[gid] |
+ |
+ return visible_member_ids_dict, visible_owner_ids_dict |
+ |
+ def ValidateFriendProjects(self, cnxn, services, friend_projects): |
+ project_names = filter(None, re.split('; |, | |;|,', friend_projects)) |
+ id_dict = services.project.LookupProjectIDs(cnxn, project_names) |
+ missed_projects = [] |
+ result = [] |
+ for p_name in project_names: |
+ if p_name in id_dict: |
+ result.append(id_dict[p_name]) |
+ else: |
+ missed_projects.append(p_name) |
+ error_msg = '' |
+ if missed_projects: |
+ error_msg = 'Project(s) %s do not exist' % ', '.join(missed_projects) |
+ return None, error_msg |
+ else: |
+ return result, None |
+ |
+ |
+class CacheManager(object): |
+ |
+ def __init__(self, invalidate_tbl=None): |
+ self.last_call = None |
+ self.processed_invalidations_up_to = 0 |
+ |
+ def MakeCache(self, kind, max_size=None, use_value_centric_cache=False): |
+ """Make a new cache and register it for future invalidations.""" |
+ if use_value_centric_cache: |
+ cache = caches.ValueCentricRamCache(self, kind, max_size=max_size) |
+ else: |
+ cache = caches.RamCache(self, kind, max_size=max_size) |
+ return cache |
+ |
+ def DoDistributedInvalidation(self, cnxn): |
+ """Drop any cache entries that were invalidated by other jobs.""" |
+ self.last_call = 'DoDistributedInvalidation', cnxn |
+ |
+ def StoreInvalidateRows(self, cnxn, kind, keys): |
+ """Store database rows to let all frontends know to invalidate.""" |
+ self.last_call = 'StoreInvalidateRows', cnxn, kind, keys |
+ |
+ def StoreInvalidateAll(self, cnxn, kind): |
+ """Store a database row to let all frontends know to invalidate.""" |
+ self.last_call = 'StoreInvalidateAll', cnxn, kind |
+ |
+ |
+ |
+class UserService(object): |
+ |
+ def __init__(self): |
+ """Creates a test-appropriate UserService object.""" |
+ self.users_by_email = {} |
+ self.users_by_id = {} |
+ self.test_users = {} |
+ |
+ def TestAddUser(self, email, user_id, add_user=True, banned=False): |
+ """Add a user to the fake UserService instance. |
+ |
+ Args: |
+ email: Email of the user. |
+ user_id: int user ID. |
+ add_user: Flag whether user pb should be created, i.e. whether a |
+ Monorail account should be created |
+ banned: Boolean to set the user as banned |
+ |
+ Returns: |
+ The User PB that was added, or None. |
+ """ |
+ self.users_by_email[email] = user_id |
+ self.users_by_id[user_id] = email |
+ |
+ user = None |
+ if add_user: |
+ user = user_pb2.MakeUser() |
+ user.is_site_admin = False |
+ user.email = email |
+ user.obscure_email = True |
+ if banned: |
+ user.banned = 'is banned' |
+ self.test_users[user_id] = user |
+ |
+ return user |
+ |
+ def GetUser(self, _cnxn, user_id): |
+ return self.test_users.get(user_id) |
+ |
+ def _CreateUser(self, _cnxn, email): |
+ if email in self.users_by_email: |
+ return |
+ user_id = framework_helpers.MurmurHash3_x86_32(email) |
+ self.users_by_id[user_id] = email |
+ self.users_by_email[email] = user_id |
+ |
+ def _CreateUsers(self, cnxn, emails): |
+ for email in emails: |
+ self._CreateUser(cnxn, email) |
+ |
+ def LookupUserID(self, cnxn, email, autocreate=False, allowgroups=False): |
+ user_id = self.users_by_email.get(email) |
+ if not user_id and validate.IsValidEmail(email): |
+ if autocreate: |
+ self._CreateUser(cnxn, email) |
+ user_id = self.users_by_email.get(email) |
+ else: |
+ raise user_svc.NoSuchUserException(email) |
+ |
+ return user_id |
+ |
+ def GetUsersByIDs(self, cnxn, user_ids, use_cache=True): |
+ user_dict = {} |
+ for user_id in user_ids: |
+ if user_id and self.test_users.get(user_id): |
+ user_dict[user_id] = self.test_users[user_id] |
+ return user_dict |
+ |
+ def LookupExistingUserIDs(self, cnxn, emails): |
+ email_dict = { |
+ email: self.users_by_email[email] |
+ for email in emails |
+ if email in self.users_by_email} |
+ return email_dict |
+ |
+ def LookupUserIDs(self, cnxn, emails, autocreate=False, |
+ allowgroups=False): |
+ email_dict = {} |
+ for email in emails: |
+ user_id = self.LookupUserID( |
+ cnxn, email, autocreate=autocreate, allowgroups=allowgroups) |
+ if user_id: |
+ email_dict[email] = user_id |
+ return email_dict |
+ |
+ def LookupUserEmail(self, _cnxn, user_id): |
+ email = self.users_by_id.get(user_id) |
+ return email |
+ |
+ def LookupUserEmails(self, cnxn, user_ids): |
+ user_dict = { |
+ user_id: self.LookupUserEmail(cnxn, user_id) |
+ for user_id in user_ids} |
+ return user_dict |
+ |
+ def UpdateUser(self, _cnxn, user_id, user): |
+ """Updates the user pb.""" |
+ self.test_users[user_id] = user |
+ |
+ def UpdateUserSettings( |
+ self, cnxn, user_id, user, notify=None, notify_starred=None, |
+ obscure_email=None, after_issue_update=None, |
+ is_site_admin=None, ignore_action_limits=None, |
+ is_banned=None, banned_reason=None, action_limit_updates=None, |
+ dismissed_cues=None, keep_people_perms_open=None, preview_on_hover=None): |
+ self.UpdateUser(cnxn, user_id, user) |
+ |
+ |
+class AbstractStarService(object): |
+ """Fake StarService.""" |
+ |
+ def __init__(self): |
+ self.stars_by_item_id = {} |
+ self.stars_by_starrer_id = {} |
+ self.expunged_item_ids = [] |
+ |
+ def ExpungeStars(self, _cnxn, item_id): |
+ self.expunged_item_ids.append(item_id) |
+ old_starrer = self.stars_by_item_id.get(item_id) |
+ self.stars_by_item_id[item_id] = [] |
+ if self.stars_by_starrer_id.get(old_starrer): |
+ self.stars_by_starrer_id[old_starrer] = [ |
+ it for it in self.stars_by_starrer_id[old_starrer] |
+ if it != item_id] |
+ |
+ def LookupItemStarrers(self, _cnxn, item_id): |
+ return self.stars_by_item_id.get(item_id, []) |
+ |
+ def LookupStarredItemIDs(self, _cnxn, starrer_user_id): |
+ return self.stars_by_starrer_id.get(starrer_user_id, []) |
+ |
+ def IsItemStarredBy(self, cnxn, item_id, starrer_user_id): |
+ return item_id in self.LookupStarredItemIDs(cnxn, starrer_user_id) |
+ |
+ def CountItemStars(self, cnxn, item_id): |
+ return len(self.LookupItemStarrers(cnxn, item_id)) |
+ |
+ def CountItemsStars(self, cnxn, item_ids): |
+ return {item_id: self.CountItemStars(cnxn, item_id) |
+ for item_id in item_ids} |
+ |
+ def SetStar(self, cnxn, item_id, starrer_user_id, starred): |
+ if starred and not self.IsItemStarredBy(cnxn, item_id, starrer_user_id): |
+ self.stars_by_item_id.setdefault(item_id, []).append(starrer_user_id) |
+ self.stars_by_starrer_id.setdefault(starrer_user_id, []).append(item_id) |
+ |
+ elif not starred and self.IsItemStarredBy(cnxn, item_id, starrer_user_id): |
+ self.stars_by_item_id[item_id].remove(starrer_user_id) |
+ self.stars_by_starrer_id[starrer_user_id].remove(item_id) |
+ |
+ |
+class UserStarService(AbstractStarService): |
+ pass |
+ |
+ |
+class ProjectStarService(AbstractStarService): |
+ pass |
+ |
+ |
+class IssueStarService(AbstractStarService): |
+ |
+ # pylint: disable=arguments-differ |
+ def SetStar( |
+ self, cnxn, _service, _config, issue_id, starrer_user_id, |
+ starred): |
+ super(IssueStarService, self).SetStar( |
+ cnxn, issue_id, starrer_user_id, starred) |
+ |
+ |
+class ProjectService(object): |
+ """Fake ProjectService object. |
+ |
+ Provides methods for creating users and projects, which are accessible |
+ through parts of the real ProjectService interface. |
+ """ |
+ |
+ def __init__(self): |
+ self.test_projects = {} # project_name -> project_pb |
+ self.projects_by_id = {} |
+ self.test_star_manager = None |
+ self.indexed_projects = {} |
+ self.unindexed_projects = set() |
+ self.index_counter = 0 |
+ self.project_commitments = {} |
+ |
+ def TestAddProject( |
+ self, name, summary='', state=project_pb2.ProjectState.LIVE, |
+ owner_ids=None, committer_ids=None, contrib_ids=None, |
+ issue_notify_address=None, state_reason='', |
+ description=None, project_id=None, process_inbound_email=None, |
+ access=None): |
+ """Add a project to the fake ProjectService object. |
+ |
+ Args: |
+ name: The name of the project. Will replace any existing project under |
+ the same name. |
+ summary: The summary string of the project. |
+ state: Initial state for the project from project_pb2.ProjectState. |
+ owner_ids: List of user ids for project owners |
+ committer_ids: List of user ids for project committers |
+ contrib_ids: List of user ids for project contributors |
+ issue_notify_address: email address to send issue change notifications |
+ state_reason: string describing the reason the project is in its current |
+ state. |
+ description: The description string for this project |
+ project_id: A unique integer identifier for the created project. |
+ process_inbound_email: True to make this project accept inbound email. |
+ access: One of the values of enum project_pb2.ProjectAccess. |
+ |
+ Returns: |
+ A populated project PB. |
+ """ |
+ proj_pb = project_pb2.Project() |
+ proj_pb.project_id = project_id or hash(name) % 100000 |
+ proj_pb.project_name = name |
+ proj_pb.summary = summary |
+ proj_pb.state = state |
+ proj_pb.state_reason = state_reason |
+ if description is not None: |
+ proj_pb.description = description |
+ |
+ self.TestAddProjectMembers(owner_ids, proj_pb, OWNER_ROLE) |
+ self.TestAddProjectMembers(committer_ids, proj_pb, COMMITTER_ROLE) |
+ self.TestAddProjectMembers(contrib_ids, proj_pb, CONTRIBUTOR_ROLE) |
+ |
+ if issue_notify_address is not None: |
+ proj_pb.issue_notify_address = issue_notify_address |
+ if process_inbound_email is not None: |
+ proj_pb.process_inbound_email = process_inbound_email |
+ if access is not None: |
+ proj_pb.access = access |
+ |
+ self.test_projects[name] = proj_pb |
+ self.projects_by_id[proj_pb.project_id] = proj_pb |
+ return proj_pb |
+ |
+ def TestAddProjectMembers(self, user_id_list, proj_pb, role): |
+ if user_id_list is not None: |
+ for user_id in user_id_list: |
+ if role == OWNER_ROLE: |
+ proj_pb.owner_ids.append(user_id) |
+ elif role == COMMITTER_ROLE: |
+ proj_pb.committer_ids.append(user_id) |
+ elif role == CONTRIBUTOR_ROLE: |
+ proj_pb.contributor_ids.append(user_id) |
+ |
+ def LookupProjectIDs(self, cnxn, project_names): |
+ return { |
+ project_name: self.test_projects[project_name].project_id |
+ for project_name in project_names |
+ if project_name in self.test_projects} |
+ |
+ def LookupProjectNames(self, cnxn, project_ids): |
+ projects_dict = self.GetProjects(cnxn, project_ids) |
+ return {p.project_id: p.project_name |
+ for p in projects_dict.itervalues()} |
+ |
+ def CreateProject( |
+ self, _cnxn, project_name, owner_ids, committer_ids, |
+ contributor_ids, summary, description, |
+ state=project_pb2.ProjectState.LIVE, access=None, read_only=None, |
+ home_page=None, docs_url=None, logo_gcs_id=None, logo_file_name=None): |
+ """Create and store a Project with the given attributes.""" |
+ if project_name in self.test_projects: |
+ raise project_svc.ProjectAlreadyExists() |
+ self.TestAddProject( |
+ project_name, summary=summary, state=state, |
+ owner_ids=owner_ids, committer_ids=committer_ids, |
+ contrib_ids=contributor_ids, description=description, |
+ access=access) |
+ |
+ def ExpungeProject(self, _cnxn, project_id): |
+ project = self.projects_by_id.get(project_id) |
+ if project: |
+ self.test_projects.pop(project.project_name, None) |
+ |
+ def GetProjectsByName(self, _cnxn, project_name_list, use_cache=True): |
+ return { |
+ pn: self.test_projects[pn] for pn in project_name_list |
+ if pn in self.test_projects} |
+ |
+ def GetProjectByName(self, _cnxn, name, use_cache=True): |
+ return self.test_projects.get(name) |
+ |
+ def GetProjectList(self, cnxn, project_id_list, use_cache=True): |
+ project_dict = self.GetProjects(cnxn, project_id_list, use_cache=use_cache) |
+ return [project_dict[pid] for pid in project_id_list |
+ if pid in project_dict] |
+ |
+ def GetVisibleLiveProjects(self, _cnxn, logged_in_user, effective_ids, |
+ use_cache=True): |
+ return self.projects_by_id.keys() |
+ |
+ def GetProjects(self, _cnxn, project_ids, use_cache=True): |
+ result = {} |
+ for project_id in project_ids: |
+ project = self.projects_by_id.get(project_id) |
+ if project: |
+ result[project_id] = project |
+ return result |
+ |
+ def GetProject(self, cnxn, project_id, use_cache=True): |
+ """Load the specified project from the database.""" |
+ project_id_dict = self.GetProjects(cnxn, [project_id], use_cache=use_cache) |
+ return project_id_dict.get(project_id) |
+ |
+ @staticmethod |
+ def IsValidProjectName(string): |
+ """Return true if the given string is a valid project name.""" |
+ return project_svc.RE_PROJECT_NAME.match(string) |
+ |
+ def GetProjectCommitments(self, _cnxn, project_id): |
+ if project_id in self.project_commitments: |
+ return self.project_commitments[project_id] |
+ |
+ project_commitments = project_pb2.ProjectCommitments() |
+ project_commitments.project_id = project_id |
+ return project_commitments |
+ |
+ def TestStoreProjectCommitments(self, project_commitments): |
+ key = project_commitments.project_id |
+ self.project_commitments[key] = project_commitments |
+ |
+ def UpdateProject( |
+ self, _cnxn, project_id, summary=None, description=None, |
+ state=None, state_reason=None, access=None, |
+ issue_notify_address=None, attachment_bytes_used=None, |
+ attachment_quota=None, moved_to=None, process_inbound_email=None, |
+ only_owners_remove_restrictions=None, |
+ read_only_reason=None, cached_content_timestamp=None, |
+ only_owners_see_contributors=None, delete_time=None, |
+ recent_activity=None, revision_url_format=None, home_page=None, |
+ docs_url=None, logo_gcs_id=None, logo_file_name=None): |
+ project = self.projects_by_id.get(project_id) |
+ if not project: |
+ raise project_svc.NoSuchProjectException( |
+ 'Project "%s" not found!' % project_id) |
+ |
+ # TODO(jrobbins): implement all passed arguments - probably as a utility |
+ # method shared with the real persistence implementation. |
+ if read_only_reason is not None: |
+ project.read_only_reason = read_only_reason |
+ |
+ def UpdateProjectRoles( |
+ self, _cnxn, project_id, owner_ids, committer_ids, |
+ contributor_ids, now=None): |
+ project = self.projects_by_id.get(project_id) |
+ if not project: |
+ raise project_svc.NoSuchProjectException( |
+ 'Project "%s" not found!' % project_id) |
+ |
+ project.owner_ids = owner_ids |
+ project.committer_ids = committer_ids |
+ project.contributor_ids = contributor_ids |
+ |
+ def MarkProjectDeletable( |
+ self, _cnxn, project_id, _config_service): |
+ project = self.projects_by_id[project_id] |
+ project.project_name = 'DELETABLE_%d' % project_id |
+ project.state = project_pb2.ProjectState.DELETABLE |
+ |
+ def UpdateRecentActivity(self, _cnxn, _project_id, now=None): |
+ pass |
+ |
+ def GetUserRolesInAllProjects(self, _cnxn, effective_ids): |
+ owned_project_ids = set() |
+ membered_project_ids = set() |
+ contrib_project_ids = set() |
+ |
+ for project in self.projects_by_id.itervalues(): |
+ if not effective_ids.isdisjoint(project.owner_ids): |
+ owned_project_ids.add(project.project_id) |
+ elif not effective_ids.isdisjoint(project.committer_ids): |
+ membered_project_ids.add(project.project_id) |
+ elif not effective_ids.isdisjoint(project.contributor_ids): |
+ contrib_project_ids.add(project.project_id) |
+ |
+ return owned_project_ids, membered_project_ids, contrib_project_ids |
+ |
+ |
+class ConfigService(object): |
+ """Fake version of ConfigService that just works in-RAM.""" |
+ |
+ def __init__(self, user_id=None): |
+ self.project_configs = {} |
+ self.next_field_id = 123 |
+ self.next_component_id = 345 |
+ self.expunged_configs = [] |
+ self.component_ids_to_templates = {} |
+ |
+ def TemplatesWithComponent(self, _cnxn, component_id, _config): |
+ return self.component_ids_to_templates.get(component_id, []) |
+ |
+ def ExpungeConfig(self, _cnxn, project_id): |
+ self.expunged_configs.append(project_id) |
+ |
+ def GetLabelDefRows(self, cnxn, project_id): |
+ """This always returns empty results. Mock it to test other cases.""" |
+ return [] |
+ |
+ def GetLabelDefRowsAnyProject(self, cnxn, where=None): |
+ """This always returns empty results. Mock it to test other cases.""" |
+ return [] |
+ |
+ def LookupLabel(self, cnxn, project_id, label_id): |
+ if label_id == 999: |
+ return None |
+ return 'label_%d_%d' % (project_id, label_id) |
+ |
+ def LookupLabelID(self, cnxn, project_id, label, autocreate=True): |
+ return 1 |
+ |
+ def LookupLabelIDs(self, cnxn, project_id, labels, autocreate=False): |
+ return [idx for idx, _label in enumerate(labels)] |
+ |
+ def LookupIDsOfLabelsMatching(self, cnxn, project_id, regex): |
+ return [1, 2, 3] |
+ |
+ def LookupStatus(self, cnxn, project_id, status_id): |
+ return 'status_%d_%d' % (project_id, status_id) |
+ |
+ def LookupStatusID(self, cnxn, project_id, status, autocreate=True): |
+ if status: |
+ return 1 |
+ else: |
+ return 0 |
+ |
+ def LookupStatusIDs(self, cnxn, project_id, statuses): |
+ return [idx for idx, _status in enumerate(statuses)] |
+ |
+ def LookupClosedStatusIDs(self, cnxn, project_id): |
+ return [7, 8, 9] |
+ |
+ def StoreConfig(self, _cnxn, config): |
+ self.project_configs[config.project_id] = config |
+ |
+ def GetProjectConfig(self, _cnxn, project_id, use_cache=True): |
+ if project_id in self.project_configs: |
+ return self.project_configs[project_id] |
+ else: |
+ return tracker_bizobj.MakeDefaultProjectIssueConfig(project_id) |
+ |
+ def GetProjectConfigs(self, _cnxn, project_ids, use_cache=True): |
+ config_dict = {} |
+ for project_id in project_ids: |
+ if project_id in self.project_configs: |
+ config_dict[project_id] = self.project_configs[project_id] |
+ else: |
+ config_dict[project_id] = tracker_bizobj.MakeDefaultProjectIssueConfig( |
+ project_id) |
+ |
+ return config_dict |
+ |
+ def UpdateConfig( |
+ self, cnxn, project, well_known_statuses=None, |
+ statuses_offer_merge=None, well_known_labels=None, |
+ excl_label_prefixes=None, templates=None, |
+ default_template_for_developers=None, default_template_for_users=None, |
+ list_prefs=None, restrict_to_known=None): |
+ project_id = project.project_id |
+ project_config = self.GetProjectConfig(cnxn, project_id, use_cache=False) |
+ |
+ if well_known_statuses is not None: |
+ tracker_bizobj.SetConfigStatuses(project_config, well_known_statuses) |
+ |
+ if statuses_offer_merge is not None: |
+ project_config.statuses_offer_merge = statuses_offer_merge |
+ |
+ if well_known_labels is not None: |
+ tracker_bizobj.SetConfigLabels(project_config, well_known_labels) |
+ |
+ if excl_label_prefixes is not None: |
+ project_config.exclusive_label_prefixes = excl_label_prefixes |
+ |
+ if templates is not None: |
+ project_config.templates = templates |
+ |
+ if default_template_for_developers is not None: |
+ project_config.default_template_for_developers = ( |
+ default_template_for_developers) |
+ if default_template_for_users is not None: |
+ project_config.default_template_for_users = default_template_for_users |
+ |
+ if list_prefs: |
+ default_col_spec, default_sort_spec, x_attr, y_attr = list_prefs |
+ project_config.default_col_spec = default_col_spec |
+ project_config.default_sort_spec = default_sort_spec |
+ project_config.default_x_attr = x_attr |
+ project_config.default_y_attr = y_attr |
+ |
+ if restrict_to_known is not None: |
+ project_config.restrict_to_known = restrict_to_known |
+ |
+ self.StoreConfig(cnxn, project_config) |
+ return project_config |
+ |
+ def CreateFieldDef( |
+ self, cnxn, project_id, field_name, field_type_str, applic_type, |
+ applic_pred, is_required, is_multivalued, |
+ min_value, max_value, regex, needs_member, needs_perm, |
+ grants_perm, notify_on, docstring, admin_ids): |
+ config = self.GetProjectConfig(cnxn, project_id) |
+ field_type = tracker_pb2.FieldTypes(field_type_str) |
+ field_id = self.next_field_id |
+ self.next_field_id += 1 |
+ fd = tracker_bizobj.MakeFieldDef( |
+ field_id, project_id, field_name, field_type, applic_type, applic_pred, |
+ is_required, is_multivalued, min_value, max_value, regex, |
+ needs_member, needs_perm, grants_perm, notify_on, docstring, False) |
+ config.field_defs.append(fd) |
+ self.StoreConfig(cnxn, config) |
+ |
+ def SoftDeleteFieldDef(self, cnxn, project_id, field_id): |
+ config = self.GetProjectConfig(cnxn, project_id) |
+ fd = tracker_bizobj.FindFieldDefByID(field_id, config) |
+ fd.is_deleted = True |
+ self.StoreConfig(cnxn, config) |
+ |
+ def UpdateFieldDef( |
+ self, cnxn, project_id, field_id, field_name=None, |
+ applicable_type=None, applicable_predicate=None, is_required=None, |
+ is_multivalued=None, min_value=None, max_value=None, regex=None, |
+ needs_member=None, needs_perm=None, grants_perm=None, notify_on=None, |
+ docstring=None, admin_ids=None): |
+ config = self.GetProjectConfig(cnxn, project_id) |
+ fd = tracker_bizobj.FindFieldDefByID(field_id, config) |
+ # pylint: disable=multiple-statements |
+ if field_name is not None: fd.field_name = field_name |
+ if applicable_type is not None: fd.applicable_type = applicable_type |
+ if applicable_predicate is not None: |
+ fd.applicable_predicate = applicable_predicate |
+ if is_required is not None: fd.is_required = is_required |
+ if is_multivalued is not None: fd.is_multivalued = is_multivalued |
+ if min_value is not None: fd.min_value = min_value |
+ if max_value is not None: fd.max_value = max_value |
+ if regex is not None: fd.regex = regex |
+ if docstring is not None: fd.docstring = docstring |
+ if admin_ids is not None: fd.admin_ids = admin_ids |
+ self.StoreConfig(cnxn, config) |
+ |
+ def CreateComponentDef( |
+ self, cnxn, project_id, path, docstring, deprecated, admin_ids, cc_ids, |
+ created, creator_id): |
+ config = self.GetProjectConfig(cnxn, project_id) |
+ cd = tracker_bizobj.MakeComponentDef( |
+ self.next_component_id, project_id, path, docstring, deprecated, |
+ admin_ids, cc_ids, created, creator_id) |
+ config.component_defs.append(cd) |
+ self.next_component_id += 1 |
+ self.StoreConfig(cnxn, config) |
+ return self.next_component_id - 1 |
+ |
+ def UpdateComponentDef( |
+ self, cnxn, project_id, component_id, path=None, docstring=None, |
+ deprecated=None, admin_ids=None, cc_ids=None, created=None, |
+ creator_id=None, modified=None, modifier_id=None): |
+ config = self.GetProjectConfig(cnxn, project_id) |
+ cd = tracker_bizobj.FindComponentDefByID(component_id, config) |
+ if path is not None: |
+ assert path |
+ cd.path = path |
+ # pylint: disable=multiple-statements |
+ if docstring is not None: cd.docstring = docstring |
+ if deprecated is not None: cd.deprecated = deprecated |
+ if admin_ids is not None: cd.admin_ids = admin_ids |
+ if cc_ids is not None: cd.cc_ids = cc_ids |
+ if created is not None: cd.created = created |
+ if creator_id is not None: cd.creator_id = creator_id |
+ if modified is not None: cd.modified = modified |
+ if modifier_id is not None: cd.modifier_id = modifier_id |
+ self.StoreConfig(cnxn, config) |
+ |
+ def DeleteComponentDef(self, cnxn, project_id, component_id): |
+ """Delete the specified component definition.""" |
+ config = self.GetProjectConfig(cnxn, project_id) |
+ config.component_defs = [ |
+ cd for cd in config.component_defs |
+ if cd.component_id != component_id] |
+ self.StoreConfig(cnxn, config) |
+ |
+ def InvalidateMemcache(self, issues, key_prefix=''): |
+ pass |
+ |
+ |
+class IssueService(object): |
+ """Fake version of IssueService that just works in-RAM.""" |
+ # pylint: disable=unused-argument |
+ |
+ def __init__(self, user_id=None): |
+ self.user_id = user_id |
+ # Dictionary {project_id: issue_pb_dict} |
+ # where issue_pb_dict is a dictionary of the form |
+ # {local_id: issue_pb} |
+ self.issues_by_project = {} |
+ self.issues_by_iid = {} |
+ # Dictionary {project_id: comment_pb_dict} |
+ # where comment_pb_dict is a dictionary of the form |
+ # {local_id: comment_pb_list} |
+ self.comments_by_project = {} |
+ self.comments_by_iid = {} |
+ self.comments_by_cid = {} |
+ self.attachments_by_id = {} |
+ |
+ # Set of issue IDs for issues that have been indexed by calling |
+ # IndexIssues(). |
+ self.indexed_issue_iids = set() |
+ |
+ # Test-only indication that the indexer would have been called |
+ # by the real DITPersist. |
+ self.indexer_called = False |
+ |
+ # Test-only sequence of updated and enqueued. |
+ self.updated_issues = [] |
+ self.enqueued_issues = [] |
+ |
+ # Test-only sequence of expunged issues and projects. |
+ self.expunged_issues = [] |
+ self.expunged_former_locations = [] |
+ self.expunged_local_ids = [] |
+ |
+ # Test-only indicators that methods were called. |
+ self.get_all_issues_in_project_called = False |
+ self.update_issues_called = False |
+ self.enqueue_issues_called = False |
+ |
+ # The next id to return if it is > 0. |
+ self.next_id = -1 |
+ |
+ def UpdateIssues( |
+ self, cnxn, issues, update_cols=None, just_derived=False, |
+ commit=True, invalidate=True): |
+ self.update_issues_called = True |
+ self.updated_issues.extend(issues) |
+ |
+ def EnqueueIssuesForIndexing(self, _cnxn, issues): |
+ self.enqueue_issues_called = True |
+ self.enqueued_issues.extend(issues) |
+ |
+ def ExpungeIssues(self, _cnxn, issue_ids): |
+ self.expunged_issues.extend(issue_ids) |
+ |
+ def ExpungeFormerLocations(self, _cnxn, project_id): |
+ self.expunged_former_locations.append(project_id) |
+ |
+ def ExpungeLocalIDCounters(self, _cnxn, project_id): |
+ self.expunged_local_ids.append(project_id) |
+ |
+ def TestAddIssue(self, issue): |
+ project_id = issue.project_id |
+ self.issues_by_project.setdefault(project_id, {}) |
+ self.issues_by_project[project_id][issue.local_id] = issue |
+ self.issues_by_iid[issue.issue_id] = issue |
+ |
+ # Adding a new issue should add the first comment to the issue |
+ comment = tracker_pb2.IssueComment() |
+ comment.project_id = issue.project_id |
+ comment.issue_id = issue.issue_id |
+ comment.content = issue.summary |
+ comment.timestamp = issue.opened_timestamp |
+ if issue.reporter_id: |
+ comment.user_id = issue.reporter_id |
+ comment.sequence = 0 |
+ self.TestAddComment(comment, issue.local_id) |
+ |
+ def TestAddComment(self, comment, local_id): |
+ pid = comment.project_id |
+ if not comment.id: |
+ comment.id = len(self.comments_by_cid) |
+ |
+ self.comments_by_project.setdefault(pid, {}) |
+ self.comments_by_project[pid].setdefault(local_id, []).append(comment) |
+ self.comments_by_iid.setdefault(comment.issue_id, []).append(comment) |
+ self.comments_by_cid[comment.id] = comment |
+ |
+ def TestAddAttachment(self, attachment, comment_id, issue_id): |
+ if not attachment.attachment_id: |
+ attachment.attachment_id = len(self.attachments_by_id) |
+ |
+ aid = attachment.attachment_id |
+ self.attachments_by_id[aid] = attachment, comment_id, issue_id |
+ comment = self.comments_by_cid[comment_id] |
+ if attachment not in comment.attachments: |
+ comment.attachments.extend([attachment]) |
+ |
+ def GetAttachmentAndContext(self, _cnxn, attachment_id): |
+ if attachment_id in self.attachments_by_id: |
+ attach, comment_id, issue_id = self.attachments_by_id[attachment_id] |
+ if not attach.deleted: |
+ return attach, comment_id, issue_id |
+ |
+ raise issue_svc.NoSuchAttachmentException() |
+ |
+ def GetComments(self, _cnxn, where=None, order_by=None, **kwargs): |
+ # This is a very limited subset of what the real GetComments() can do. |
+ cid = kwargs.get('id') |
+ |
+ comment = self.comments_by_cid.get(cid) |
+ if comment: |
+ return [comment] |
+ else: |
+ return [] |
+ |
+ def GetComment(self, cnxn, comment_id): |
+ """Get the requested comment, or raise an exception.""" |
+ comments = self.GetComments(cnxn, id=comment_id) |
+ if len(comments) == 1: |
+ return comments[0] |
+ |
+ raise issue_svc.NoSuchCommentException() |
+ |
+ def ResolveIssueRefs(self, cnxn, ref_projects, default_project_name, refs): |
+ result = [] |
+ for project_name, local_id in refs: |
+ project = ref_projects.get(project_name or default_project_name) |
+ if not project or project.state == project_pb2.ProjectState.DELETABLE: |
+ continue # ignore any refs to issues in deleted projects |
+ try: |
+ issue = self.GetIssueByLocalID(cnxn, project.project_id, local_id) |
+ result.append(issue.issue_id) |
+ except issue_svc.NoSuchIssueException: |
+ pass # ignore any refs to issues that don't exist |
+ |
+ return result |
+ |
+ def GetAllIssuesInProject(self, _cnxn, project_id, min_local_id=None): |
+ self.get_all_issues_in_project_called = True |
+ if project_id in self.issues_by_project: |
+ return self.issues_by_project[project_id].values() |
+ else: |
+ return [] |
+ |
+ def GetIssuesByLocalIDs( |
+ self, _cnxn, project_id, local_id_list, shard_id=None): |
+ results = [] |
+ for local_id in local_id_list: |
+ if (project_id in self.issues_by_project |
+ and local_id in self.issues_by_project[project_id]): |
+ results.append(self.issues_by_project[project_id][local_id]) |
+ |
+ return results |
+ |
+ def GetIssueByLocalID(self, _cnxn, project_id, local_id): |
+ try: |
+ return self.issues_by_project[project_id][local_id] |
+ except KeyError: |
+ raise issue_svc.NoSuchIssueException() |
+ |
+ def GetAnyOnHandIssue(self, issue_ids, start=None, end=None): |
+ return None # Treat them all like misses. |
+ |
+ def GetIssue(self, _cnxn, issue_id): |
+ if issue_id in self.issues_by_iid: |
+ return self.issues_by_iid[issue_id] |
+ else: |
+ raise issue_svc.NoSuchIssueException() |
+ |
+ def LookupIssueID(self, _cnxn, project_id, local_id): |
+ try: |
+ issue = self.issues_by_project[project_id][local_id] |
+ except KeyError: |
+ raise issue_svc.NoSuchIssueException() |
+ return issue.issue_id |
+ |
+ def GetCommentsForIssue(self, _cnxn, issue_id): |
+ comments = self.comments_by_iid.get(issue_id, []) |
+ for idx, c in enumerate(comments): |
+ c.sequence = idx |
+ |
+ return comments |
+ |
+ def InsertIssue(self, cnxn, issue): |
+ issue.issue_id = issue.project_id * 1000000 + issue.local_id |
+ self.issues_by_project.setdefault(issue.project_id, {}) |
+ self.issues_by_project[issue.project_id][issue.local_id] = issue |
+ self.issues_by_iid[issue.issue_id] = issue |
+ return issue.issue_id |
+ |
+ def CreateIssue( |
+ self, cnxn, services, project_id, |
+ summary, status, owner_id, cc_ids, labels, field_values, |
+ component_ids, reporter_id, marked_description, blocked_on=None, |
+ blocking=None, attachments=None, timestamp=None, index_now=True): |
+ issue = tracker_pb2.Issue() |
+ issue.project_id = project_id |
+ issue.summary = summary |
+ issue.status = status |
+ if owner_id: |
+ issue.owner_id = owner_id |
+ issue.cc_ids.extend(cc_ids) |
+ issue.labels.extend(labels) |
+ issue.field_values.extend(field_values) |
+ issue.reporter_id = reporter_id |
+ if timestamp: |
+ issue.opened_timestamp = timestamp |
+ |
+ if blocked_on: |
+ issue.blocked_on_iids.extend(blocked_on) |
+ if blocking: |
+ issue.blocking.extend(blocking) |
+ |
+ if blocking: |
+ issue.blocking_iids.extend(blocking) |
+ |
+ issue.local_id = self.AllocateNextLocalID(cnxn, project_id) |
+ issue.issue_id = project_id * 1000000 + issue.local_id |
+ |
+ self.TestAddIssue(issue) |
+ self.comments_by_iid[issue.issue_id][0].content = marked_description |
+ return issue.local_id |
+ |
+ def SetUsedLocalID(self, cnxn, project_id): |
+ self.next_id = self.GetHighestLocalID(cnxn, project_id) + 1 |
+ |
+ def AllocateNextLocalID(self, cnxn, project_id): |
+ return self.GetHighestLocalID(cnxn, project_id) + 1 |
+ |
+ def GetHighestLocalID(self, _cnxn, project_id): |
+ if self.next_id > 0: |
+ return self.next_id - 1 |
+ else: |
+ issue_dict = self.issues_by_project.get(project_id, {}) |
+ highest = max([0] + [issue.local_id for issue in issue_dict.itervalues()]) |
+ return highest |
+ |
+ def ApplyIssueComment( |
+ self, cnxn, services, reporter_id, project_id, |
+ local_id, summary, status, owner_id, cc_ids, labels, field_values, |
+ component_ids, blocked_on, blocking, dangling_blocked_on_refs, |
+ dangling_blocking_refs, merged_into, index_now=True, |
+ page_gen_ts=None, comment=None, inbound_message=None, attachments=None, |
+ timestamp=None): |
+ """Feel free to implement a spec-compliant return value.""" |
+ issue = self.issues_by_project[project_id][local_id] |
+ amendments = [] |
+ |
+ if summary and summary != issue.summary: |
+ issue.summary = summary |
+ amendments.append(tracker_bizobj.MakeSummaryAmendment( |
+ summary, issue.summary)) |
+ |
+ if status and status != issue.status: |
+ issue.status = status |
+ amendments.append(tracker_bizobj.MakeStatusAmendment( |
+ status, issue.status)) |
+ |
+ issue.owner_id = owner_id |
+ issue.cc_ids = cc_ids |
+ issue.labels = labels |
+ issue.field_values = field_values |
+ issue.component_ids = component_ids |
+ |
+ issue.blocked_on_iids.extend(blocked_on) |
+ issue.blocking_iids.extend(blocking) |
+ issue.dangling_blocked_on_refs.extend(dangling_blocked_on_refs) |
+ issue.dangling_blocking_refs.extend(dangling_blocking_refs) |
+ |
+ if merged_into is not None: |
+ issue.merged_into = merged_into |
+ |
+ if amendments or (comment and comment.strip()) or attachments: |
+ comment_pb = self.CreateIssueComment( |
+ cnxn, project_id, local_id, reporter_id, comment, |
+ amendments=amendments, inbound_message=inbound_message) |
+ else: |
+ comment_pb = None |
+ |
+ return amendments, comment_pb |
+ |
+ def GetCommentsForIssues(self, _cnxn, issue_ids): |
+ comments_dict = {} |
+ for issue_id in issue_ids: |
+ comments_dict[issue_id] = self.comments_by_iid[issue_id] |
+ |
+ return comments_dict |
+ |
+ def InsertComment(self, cnxn, comment, commit=True): |
+ issue = self.GetIssue(cnxn, comment.issue_id) |
+ self.TestAddComment(comment, issue.local_id) |
+ |
+ # pylint: disable=unused-argument |
+ def DeltaUpdateIssue( |
+ self, cnxn, services, reporter_id, project_id, |
+ config, issue, status, owner_id, cc_add, cc_remove, comp_ids_add, |
+ comp_ids_remove, labels_add, labels_remove, field_vals_add, |
+ field_vals_remove, fields_clear, blocked_on_add=None, |
+ blocked_on_remove=None, blocking_add=None, blocking_remove=None, |
+ merged_into=None, index_now=False, comment=None, summary=None, |
+ iids_to_invalidate=None, rules=None, predicate_asts=None, |
+ timestamp=None): |
+ # Return a bogus amendments list if any of the fields changed |
+ amendments = [] |
+ comment_pb = tracker_pb2.IssueComment() |
+ if (status or owner_id or cc_add or cc_remove or labels_add or |
+ labels_remove or field_vals_add or field_vals_remove or fields_clear or |
+ blocked_on_add or blocked_on_remove or blocking_add or |
+ blocking_remove or merged_into or summary): |
+ amendments.append(tracker_bizobj.MakeStatusAmendment( |
+ 'Updated', issue.status)) |
+ |
+ if not amendments and (not comment or not comment.strip()): |
+ return [], None |
+ |
+ comment_pb = self.CreateIssueComment( |
+ cnxn, project_id, issue.local_id, reporter_id, comment, |
+ amendments=amendments) |
+ |
+ self.indexer_called = index_now |
+ return amendments, comment_pb |
+ |
+ def InvalidateIIDs(self, cnxn, iids_to_invalidate): |
+ pass |
+ |
+ # pylint: disable=unused-argument |
+ def CreateIssueComment( |
+ self, _cnxn, project_id, local_id, user_id, content, |
+ inbound_message=None, amendments=None, attachments=None, timestamp=None, |
+ is_spam=False, commit=True): |
+ # Add a comment to an issue |
+ issue = self.issues_by_project[project_id][local_id] |
+ |
+ comment = tracker_pb2.IssueComment() |
+ comment.id = len(self.comments_by_cid) |
+ comment.project_id = project_id |
+ comment.issue_id = issue.issue_id |
+ comment.content = content |
+ comment.user_id = user_id |
+ if timestamp is not None: |
+ comment.timestamp = timestamp |
+ else: |
+ comment.timestamp = 1234567890 |
+ if amendments: |
+ comment.amendments.extend(amendments) |
+ if inbound_message: |
+ comment.inbound_message = inbound_message |
+ |
+ pid = project_id |
+ self.comments_by_project.setdefault(pid, {}) |
+ self.comments_by_project[pid].setdefault(local_id, []).append(comment) |
+ self.comments_by_iid.setdefault(issue.issue_id, []).append(comment) |
+ self.comments_by_cid[comment.id] = comment |
+ |
+ if attachments: |
+ for filename, filecontent, mimetype in attachments: |
+ aid = len(self.attachments_by_id) |
+ attach = comment.attachments_add( |
+ attachment_id=aid, |
+ filename=filename, |
+ filesize=len(filecontent), |
+ mimetype=mimetype, |
+ blobkey='blob(%s)' % filename) |
+ self.attachments_by_id[aid] = attach, pid, comment.id |
+ |
+ return comment |
+ |
+ def GetOpenAndClosedIssues(self, _cnxn, issue_ids): |
+ open_issues = [] |
+ closed_issues = [] |
+ for issue_id in issue_ids: |
+ try: |
+ issue = self.issues_by_iid[issue_id] |
+ if issue.status == 'Fixed': |
+ closed_issues.append(issue) |
+ else: |
+ open_issues.append(issue) |
+ except KeyError: |
+ continue |
+ |
+ return open_issues, closed_issues |
+ |
+ def GetIssuesDict( |
+ self, _cnxn, issue_ids, use_cache=True, shard_id=None): |
+ return {iid: self.issues_by_iid[iid] for iid in issue_ids} |
+ |
+ def GetIssues(self, _cnxn, issue_ids, use_cache=True, shard_id=None): |
+ results = [self.issues_by_iid[issue_id] for issue_id in issue_ids |
+ if issue_id in self.issues_by_iid] |
+ |
+ return results |
+ |
+ def SoftDeleteIssue( |
+ self, _cnxn, project_id, local_id, deleted, user_service): |
+ issue = self.issues_by_project[project_id][local_id] |
+ issue.deleted = deleted |
+ |
+ def SoftDeleteComment( |
+ self, cnxn, project_id, local_id, sequence_num, |
+ deleted_by_user_id, user_service, delete=True, reindex=True, |
+ is_spam=False): |
+ issue = self.GetIssueByLocalID(cnxn, project_id, local_id) |
+ comments = self.GetCommentsForIssue(cnxn, issue.issue_id) |
+ if not comments: |
+ raise Exception( |
+ 'No comments for issue, project, seq (%s, %s, %s), cannot delete' |
+ % (local_id, project_id, sequence_num)) |
+ if len(comments) < sequence_num: |
+ raise Exception( |
+ 'Attempting to delete comment %s only %s comments created' % |
+ (sequence_num, len(comments))) |
+ comments[sequence_num].is_spam = is_spam |
+ if delete: |
+ comments[sequence_num].deleted_by = deleted_by_user_id |
+ else: |
+ comments[sequence_num].reset('deleted_by') |
+ |
+ def DeleteComponentReferences(self, _cnxn, component_id): |
+ for _, issue in self.issues_by_iid.iteritems(): |
+ issue.component_ids = [ |
+ cid for cid in issue.component_ids if cid != component_id] |
+ |
+ def RunIssueQuery( |
+ self, cnxn, left_joins, where, order_by, shard_id=None, limit=None): |
+ """This always returns empty results. Mock it to test other cases.""" |
+ return [], False |
+ |
+ def GetIIDsByLabelIDs(self, cnxn, label_ids, project_id, shard_id): |
+ """This always returns empty results. Mock it to test other cases.""" |
+ return [] |
+ |
+ def GetIIDsByParticipant(self, cnxn, user_ids, project_ids, shard_id): |
+ """This always returns empty results. Mock it to test other cases.""" |
+ return [] |
+ |
+ def MoveIssues(self, cnxn, dest_project, issues, user_service): |
+ move_to = dest_project.project_id |
+ self.issues_by_project.setdefault(move_to, {}) |
+ for issue in issues: |
+ project_id = issue.project_id |
+ self.issues_by_project[project_id].pop(issue.local_id) |
+ issue.local_id = self.AllocateNextLocalID(cnxn, move_to) |
+ self.issues_by_project[move_to][issue.local_id] = issue |
+ issue.project_id = move_to |
+ return [] |
+ |
+ |
+class SpamService(object): |
+ """Fake version of SpamService that just works in-RAM.""" |
+ |
+ def __init__(self, user_id=None): |
+ self.user_id = user_id |
+ self.reports_by_issue_id = collections.defaultdict(list) |
+ self.comment_reports_by_issue_id = collections.defaultdict(dict) |
+ self.manual_verdicts_by_issue_id = collections.defaultdict(dict) |
+ self.manual_verdicts_by_comment_id = collections.defaultdict(dict) |
+ |
+ def FlagIssues(self, cnxn, issue_service, issues, user_id, flagged_spam): |
+ for issue in issues: |
+ if flagged_spam: |
+ self.reports_by_issue_id[issue.issue_id].append(user_id) |
+ else: |
+ self.reports_by_issue_id[issue.issue_id].remove(user_id) |
+ |
+ def FlagComment(self, cnxn, issue_id, comment_id, reported_user_id, user_id, |
+ flagged_spam): |
+ if not comment_id in self.comment_reports_by_issue_id[issue_id]: |
+ self.comment_reports_by_issue_id[issue_id][comment_id] = [] |
+ if flagged_spam: |
+ self.comment_reports_by_issue_id[issue_id][comment_id].append(user_id) |
+ else: |
+ self.comment_reports_by_issue_id[issue_id][comment_id].remove(user_id) |
+ |
+ def RecordManualIssueVerdicts( |
+ self, cnxn, issue_service, issues, user_id, is_spam): |
+ for issue in issues: |
+ self.manual_verdicts_by_issue_id[issue.issue_id][user_id] = is_spam |
+ |
+ def RecordManualCommentVerdict( |
+ self, cnxn, issue_service, user_service, comment_id, |
+ sequnce_num, user_id, is_spam): |
+ self.manual_verdicts_by_comment_id[comment_id][user_id] = is_spam |
+ |
+ def RecordClassifierIssueVerdict(self, cnxn, issue, is_spam, confidence): |
+ return |
+ |
+ def RecordClassifierCommentVerdict(self, cnxn, issue, is_spam, confidence): |
+ return |
+ |
+ def ClassifyComment(self, comment): |
+ return {'outputLabel': 'ham', |
+ 'outputMulti': [{'label': 'ham', 'score': '1.0'}]} |
+ |
+ def ClassifyIssue(self, issue, firstComment): |
+ return {'outputLabel': 'ham', |
+ 'outputMulti': [{'label': 'ham', 'score': '1.0'}]} |
+ |
+ |
+class FeaturesService(object): |
+ """A fake implementation of FeaturesService.""" |
+ def __init__(self): |
+ # Test-only sequence of expunged projects. |
+ self.expunged_saved_queries = [] |
+ self.expunged_filter_rules = [] |
+ self.expunged_quick_edit = [] |
+ |
+ def ExpungeSavedQueriesExecuteInProject(self, _cnxn, project_id): |
+ self.expunged_saved_queries.append(project_id) |
+ |
+ def ExpungeFilterRules(self, _cnxn, project_id): |
+ self.expunged_filter_rules.append(project_id) |
+ |
+ def ExpungeQuickEditHistory(self, _cnxn, project_id): |
+ self.expunged_quick_edit.append(project_id) |
+ |
+ def GetFilterRules(self, cnxn, project_id): |
+ return [] |
+ |
+ def GetCannedQueriesByProjectID(self, cnxn, project_id): |
+ return [] |
+ |
+ def UpdateCannedQueries(self, cnxn, project_id, canned_queries): |
+ pass |
+ |
+ def GetSubscriptionsInProjects(self, cnxn, project_ids): |
+ return {} |
+ |
+ def GetSavedQuery(self, cnxn, query_id): |
+ return tracker_pb2.SavedQuery() |
+ |
+ |
+class PostData(object): |
+ """A dictionary-like object that also implements getall().""" |
+ |
+ def __init__(self, *args, **kwargs): |
+ self.dictionary = dict(*args, **kwargs) |
+ |
+ def getall(self, key): |
+ """Return all values, assume that the value at key is already a list.""" |
+ return self.dictionary.get(key, []) |
+ |
+ def get(self, key, default=None): |
+ """Return first value, assume that the value at key is already a list.""" |
+ return self.dictionary.get(key, [default])[0] |
+ |
+ def __getitem__(self, key): |
+ """Return first value, assume that the value at key is already a list.""" |
+ return self.dictionary[key][0] |
+ |
+ def __contains__(self, key): |
+ return key in self.dictionary |
+ |
+ def keys(self): |
+ """Return the keys in the POST data.""" |
+ return self.dictionary.keys() |
+ |
+ |
+class FakeFile: |
+ def __init__(self, data=None): |
+ self.data = data |
+ |
+ def read(self): |
+ return self.data |
+ |
+ def write(self, content): |
+ return |
+ |
+ def __enter__(self): |
+ return self |
+ |
+ def __exit__(self, __1, __2, __3): |
+ return None |
+ |
+ |
+def gcs_open(filename, mode): |
+ return FakeFile(filename) |