Index: appengine/monorail/search/frontendsearchpipeline.py |
diff --git a/appengine/monorail/search/frontendsearchpipeline.py b/appengine/monorail/search/frontendsearchpipeline.py |
new file mode 100644 |
index 0000000000000000000000000000000000000000..9bc74b1356dc01414d962b2acb6dcfee2d501458 |
--- /dev/null |
+++ b/appengine/monorail/search/frontendsearchpipeline.py |
@@ -0,0 +1,935 @@ |
+# Copyright 2016 The Chromium Authors. All rights reserved. |
+# Use of this source code is govered by a BSD-style |
+# license that can be found in the LICENSE file or at |
+# https://developers.google.com/open-source/licenses/bsd |
+ |
+"""The FrontendSearchPipeline class manages issue search and sorting. |
+ |
+The frontend pipeline checks memcache for cached results in each shard. It |
+then calls backend jobs to do any shards that had a cache miss. On cache hit, |
+the cached results must be filtered by permissions, so the at-risk cache and |
+backends are consulted. Next, the sharded results are combined into an overall |
+list of IIDs. Then, that list is paginated and the issues on the current |
+pagination page can be shown. Alternatively, this class can determine just the |
+position the currently shown issue would occupy in the overall sorted list. |
+""" |
+ |
+import json |
+ |
+import collections |
+import logging |
+import math |
+import random |
+import time |
+ |
+from google.appengine.api import apiproxy_stub_map |
+from google.appengine.api import memcache |
+from google.appengine.api import modules |
+from google.appengine.api import urlfetch |
+ |
+import settings |
+from features import savedqueries_helpers |
+from framework import framework_constants |
+from framework import framework_helpers |
+from framework import paginate |
+from framework import sorting |
+from framework import urls |
+from search import query2ast |
+from search import searchpipeline |
+from services import fulltext_helpers |
+from tracker import tracker_bizobj |
+from tracker import tracker_constants |
+from tracker import tracker_helpers |
+ |
+ |
+# Fail-fast responses usually finish in less than 50ms. If we see a failure |
+# in under that amount of time, we don't bother logging it. |
+FAIL_FAST_LIMIT_SEC = 0.050 |
+ |
+# The choices help balance the cost of choosing samples vs. the cost of |
+# selecting issues that are in a range bounded by neighboring samples. |
+# Preferred chunk size parameters were determined by experimentation. |
+MIN_SAMPLE_CHUNK_SIZE = int( |
+ math.sqrt(tracker_constants.DEFAULT_RESULTS_PER_PAGE)) |
+MAX_SAMPLE_CHUNK_SIZE = int(math.sqrt(settings.search_limit_per_shard)) |
+PREFERRED_NUM_CHUNKS = 50 |
+ |
+ |
+class FrontendSearchPipeline(object): |
+ """Manage the process of issue search, including backends and caching. |
+ |
+ Even though the code is divided into several methods, the public |
+ methods should be called in sequence, so the execution of the code |
+ is pretty much in the order of the source code lines here. |
+ """ |
+ |
+ def __init__(self, mr, services, prof, default_results_per_page): |
+ self.mr = mr |
+ self.services = services |
+ self.profiler = prof |
+ self.default_results_per_page = default_results_per_page |
+ self.grid_mode = (mr.mode == 'grid') |
+ self.grid_limited = False |
+ self.pagination = None |
+ self.num_skipped_at_start = 0 |
+ self.total_count = 0 |
+ |
+ self.query_project_names = set() |
+ if mr.query_project_names: |
+ self.query_project_names.update(mr.query_project_names) |
+ |
+ projects = services.project.GetProjectsByName( |
+ mr.cnxn, self.query_project_names).values() |
+ self.query_project_ids = [p.project_id for p in projects] |
+ if mr.project_name: |
+ self.query_project_ids.append(mr.project_id) |
+ self.query_project_names.add(mr.project_name) |
+ |
+ config_dict = self.services.config.GetProjectConfigs( |
+ mr.cnxn, self.query_project_ids) |
+ self.harmonized_config = tracker_bizobj.HarmonizeConfigs( |
+ config_dict.values()) |
+ |
+ # The following fields are filled in as the pipeline progresses. |
+ # The value None means that we still need to compute that value. |
+ self.users_by_id = {} |
+ self.nonviewable_iids = {} # {shard_id: set(iid)} |
+ self.unfiltered_iids = {} # {shard_id: [iid, ...]} needing perm checks. |
+ self.filtered_iids = {} # {shard_id: [iid, ...]} already perm checked. |
+ self.search_limit_reached = {} # {shard_id: [bool, ...]}. |
+ self.counts = {} |
+ self.allowed_iids = [] # Matching iids that user is permitted to view. |
+ self.allowed_results = None # results that the user is permitted to view. |
+ self.visible_results = None # allowed_results on current pagination page. |
+ self.error_responses = set() |
+ |
+ # Projects that contain the result issues. This starts off as a dict of |
+ # all the query projects, but it can grow based on the found issues in the |
+ # case where the user is searching across the entire site. |
+ self.issue_projects = {p.project_id: p for p in projects} |
+ |
+ error_msg = query2ast.CheckSyntax( |
+ self.mr.query, self.harmonized_config, warnings=self.mr.warnings) |
+ if error_msg: |
+ self.mr.errors.query = error_msg |
+ |
+ def SearchForIIDs(self): |
+ """Use backends to search each shard and store their results.""" |
+ with self.profiler.Phase('Checking cache and calling Backends'): |
+ rpc_tuples = _StartBackendSearch( |
+ self.mr, self.query_project_names, self.query_project_ids, |
+ self.harmonized_config, self.unfiltered_iids, |
+ self.search_limit_reached, self.nonviewable_iids, |
+ self.error_responses, self.services) |
+ |
+ with self.profiler.Phase('Waiting for Backends'): |
+ try: |
+ _FinishBackendSearch(rpc_tuples) |
+ except Exception as e: |
+ logging.exception(e) |
+ raise |
+ |
+ if self.error_responses: |
+ logging.error('%r error responses. Incomplete search results.', |
+ self.error_responses) |
+ |
+ with self.profiler.Phase('Filtering cached results'): |
+ for shard_id in self.unfiltered_iids: |
+ if shard_id not in self.nonviewable_iids: |
+ logging.error( |
+ 'Not displaying shard %r because of no nonviewable_iids', shard_id) |
+ self.error_responses.add(shard_id) |
+ filtered_shard_iids = [] |
+ else: |
+ unfiltered_shard_iids = self.unfiltered_iids[shard_id] |
+ nonviewable_shard_iids = self.nonviewable_iids[shard_id] |
+ # TODO(jrobbins): avoid creating large temporary lists. |
+ filtered_shard_iids = [iid for iid in unfiltered_shard_iids |
+ if iid not in nonviewable_shard_iids] |
+ if self.grid_mode: |
+ self.filtered_iids[shard_id] = filtered_shard_iids |
+ else: |
+ self.filtered_iids[shard_id] = filtered_shard_iids[ |
+ :self.mr.start + self.mr.num] |
+ self.counts[shard_id] = len(filtered_shard_iids) |
+ |
+ with self.profiler.Phase('Counting all filtered results'): |
+ self.total_count = sum(self.counts.itervalues()) |
+ |
+ def MergeAndSortIssues(self): |
+ """Merge and sort results from all shards into one combined list.""" |
+ with self.profiler.Phase('selecting issues to merge and sort'): |
+ if not self.grid_mode: |
+ self._NarrowFilteredIIDs() |
+ self.allowed_iids = [] |
+ for filtered_shard_iids in self.filtered_iids.itervalues(): |
+ self.allowed_iids.extend(filtered_shard_iids) |
+ |
+ # The grid view is not paginated, so limit the results shown to avoid |
+ # generating a HTML page that would be too large. |
+ limit = settings.max_issues_in_grid |
+ if self.grid_mode and len(self.allowed_iids) > limit: |
+ self.grid_limited = True |
+ self.allowed_iids = self.allowed_iids[:limit] |
+ |
+ with self.profiler.Phase('getting allowed results'): |
+ self.allowed_results = self.services.issue.GetIssues( |
+ self.mr.cnxn, self.allowed_iids) |
+ |
+ # Note: At this point, we have results that are only sorted within |
+ # each backend's shard. We still need to sort the merged result. |
+ self._LookupNeededUsers(self.allowed_results) |
+ with self.profiler.Phase('merging and sorting issues'): |
+ self.allowed_results = _SortIssues( |
+ self.mr, self.allowed_results, self.harmonized_config, |
+ self.users_by_id) |
+ |
+ def _NarrowFilteredIIDs(self): |
+ """Combine filtered shards into a range of IIDs for issues to sort. |
+ |
+ The niave way is to concatenate shard_iids[:start + num] for all |
+ shards then select [start:start + num]. We do better by sampling |
+ issues and then determining which of those samples are known to |
+ come before start or after start+num. We then trim off all those IIDs |
+ and sort a smaller range of IIDs that might actuall be displayed. |
+ See the design doc at go/monorail-sorting. |
+ |
+ This method modifies self.fitered_iids and self.num_skipped_at_start. |
+ """ |
+ # Sample issues and skip those that are known to come before start. |
+ # See the "Sorting in Monorail" design doc. |
+ |
+ # If the result set is small, don't bother optimizing it. |
+ orig_length = _TotalLength(self.filtered_iids) |
+ if orig_length < self.mr.num * 4: |
+ return |
+ |
+ # 1. Get sample issues in each shard and sort them all together. |
+ last = self.mr.start + self.mr.num |
+ on_hand_samples = {} |
+ needed_iids = [] |
+ for shard_id in self.filtered_iids: |
+ self._AccumulateSampleIssues( |
+ self.filtered_iids[shard_id], on_hand_samples, needed_iids) |
+ retrieved_samples = self.services.issue.GetIssuesDict( |
+ self.mr.cnxn, needed_iids) |
+ sample_issues = on_hand_samples.values() + retrieved_samples.values() |
+ self._LookupNeededUsers(sample_issues) |
+ sample_issues = _SortIssues( |
+ self.mr, sample_issues, self.harmonized_config, self.users_by_id) |
+ sample_iids = [issue.issue_id for issue in sample_issues] |
+ |
+ # 2. Trim off some IIDs that are sure to be positioned after last. |
+ num_trimmed_end = _TrimEndShardedIIDs(self.filtered_iids, sample_iids, last) |
+ logging.info('Trimmed %r issues from the end of shards', num_trimmed_end) |
+ |
+ # 3. Trim off some IIDs that are sure to be posiitoned before start. |
+ keep = _TotalLength(self.filtered_iids) - self.mr.start |
+ # Reverse the sharded lists. |
+ _ReverseShards(self.filtered_iids) |
+ sample_iids.reverse() |
+ self.num_skipped_at_start = _TrimEndShardedIIDs( |
+ self.filtered_iids, sample_iids, keep) |
+ logging.info('Trimmed %r issues from the start of shards', |
+ self.num_skipped_at_start) |
+ # Reverse sharded lists again to get back into forward order. |
+ _ReverseShards(self.filtered_iids) |
+ |
+ def DetermineIssuePosition(self, issue): |
+ """Calculate info needed to show the issue flipper. |
+ |
+ Args: |
+ issue: The issue currently being viewed. |
+ |
+ Returns: |
+ A 3-tuple (prev_iid, index, next_iid) were prev_iid is the |
+ IID of the previous issue in the total ordering (or None), |
+ index is the index that the current issue has in the total |
+ ordering, and next_iid is the next issue (or None). If the current |
+ issue is not in the list of results at all, returns None, None, None. |
+ """ |
+ # 1. If the current issue is not in the results at all, then exit. |
+ if not any(issue.issue_id in filtered_shard_iids |
+ for filtered_shard_iids in self.filtered_iids.itervalues()): |
+ return None, None, None |
+ |
+ # 2. Choose and retrieve sample issues in each shard. |
+ samples_by_shard = {} # {shard_id: {iid: issue}} |
+ needed_iids = [] |
+ for shard_id in self.filtered_iids: |
+ samples_by_shard[shard_id] = {} |
+ self._AccumulateSampleIssues( |
+ self.filtered_iids[shard_id], samples_by_shard[shard_id], needed_iids) |
+ retrieved_samples = self.services.issue.GetIssuesDict( |
+ self.mr.cnxn, needed_iids) |
+ for retrieved_iid, retrieved_issue in retrieved_samples.iteritems(): |
+ shard_id = retrieved_iid % settings.num_logical_shards |
+ samples_by_shard[shard_id][retrieved_iid] = retrieved_issue |
+ |
+ # 3. Build up partial results for each shard. |
+ preceeding_counts = {} # dict {shard_id: num_issues_preceeding_current} |
+ prev_candidates, next_candidates = [], [] |
+ for shard_id in self.filtered_iids: |
+ prev_candidate, index_in_shard, next_candidate = ( |
+ self._DetermineIssuePositionInShard( |
+ shard_id, issue, samples_by_shard[shard_id])) |
+ preceeding_counts[shard_id] = index_in_shard |
+ if prev_candidate: |
+ prev_candidates.append(prev_candidate) |
+ if next_candidate: |
+ next_candidates.append(next_candidate) |
+ |
+ # 4. Combine the results. |
+ index = sum(preceeding_counts.itervalues()) |
+ prev_candidates = _SortIssues( |
+ self.mr, prev_candidates, self.harmonized_config, self.users_by_id) |
+ prev_iid = prev_candidates[-1].issue_id if prev_candidates else None |
+ next_candidates = _SortIssues( |
+ self.mr, next_candidates, self.harmonized_config, self.users_by_id) |
+ next_iid = next_candidates[0].issue_id if next_candidates else None |
+ |
+ return prev_iid, index, next_iid |
+ |
+ def _DetermineIssuePositionInShard(self, shard_id, issue, sample_dict): |
+ """Determine where the given issue would fit into results from a shard.""" |
+ # See the design doc for details. Basically, it first surveys the results |
+ # to bound a range where the given issue would belong, then it fetches the |
+ # issues in that range and sorts them. |
+ |
+ filtered_shard_iids = self.filtered_iids[shard_id] |
+ |
+ # 1. Select a sample of issues, leveraging ones we have in RAM already. |
+ issues_on_hand = sample_dict.values() |
+ if issue.issue_id not in sample_dict: |
+ issues_on_hand.append(issue) |
+ |
+ self._LookupNeededUsers(issues_on_hand) |
+ sorted_on_hand = _SortIssues( |
+ self.mr, issues_on_hand, self.harmonized_config, self.users_by_id) |
+ sorted_on_hand_iids = [soh.issue_id for soh in sorted_on_hand] |
+ index_in_on_hand = sorted_on_hand_iids.index(issue.issue_id) |
+ |
+ # 2. Bound the gap around where issue belongs. |
+ if index_in_on_hand == 0: |
+ fetch_start = 0 |
+ else: |
+ prev_on_hand_iid = sorted_on_hand_iids[index_in_on_hand - 1] |
+ fetch_start = filtered_shard_iids.index(prev_on_hand_iid) + 1 |
+ |
+ if index_in_on_hand == len(sorted_on_hand) - 1: |
+ fetch_end = len(filtered_shard_iids) |
+ else: |
+ next_on_hand_iid = sorted_on_hand_iids[index_in_on_hand + 1] |
+ fetch_end = filtered_shard_iids.index(next_on_hand_iid) |
+ |
+ # 3. Retrieve all the issues in that gap to get an exact answer. |
+ fetched_issues = self.services.issue.GetIssues( |
+ self.mr.cnxn, filtered_shard_iids[fetch_start:fetch_end]) |
+ if issue.issue_id not in filtered_shard_iids[fetch_start:fetch_end]: |
+ fetched_issues.append(issue) |
+ self._LookupNeededUsers(fetched_issues) |
+ sorted_fetched = _SortIssues( |
+ self.mr, fetched_issues, self.harmonized_config, self.users_by_id) |
+ sorted_fetched_iids = [sf.issue_id for sf in sorted_fetched] |
+ index_in_fetched = sorted_fetched_iids.index(issue.issue_id) |
+ |
+ # 4. Find the issues that come immediately before and after the place where |
+ # the given issue would belong in this shard. |
+ if index_in_fetched > 0: |
+ prev_candidate = sorted_fetched[index_in_fetched - 1] |
+ elif index_in_on_hand > 0: |
+ prev_candidate = sorted_on_hand[index_in_on_hand - 1] |
+ else: |
+ prev_candidate = None |
+ |
+ if index_in_fetched < len(sorted_fetched) - 1: |
+ next_candidate = sorted_fetched[index_in_fetched + 1] |
+ elif index_in_on_hand < len(sorted_on_hand) - 1: |
+ next_candidate = sorted_on_hand[index_in_on_hand + 1] |
+ else: |
+ next_candidate = None |
+ |
+ return prev_candidate, fetch_start + index_in_fetched, next_candidate |
+ |
+ def _AccumulateSampleIssues(self, issue_ids, sample_dict, needed_iids): |
+ """Select a scattering of issues from the list, leveraging RAM cache.""" |
+ chunk_size = max(MIN_SAMPLE_CHUNK_SIZE, min(MAX_SAMPLE_CHUNK_SIZE, |
+ int(len(issue_ids) / PREFERRED_NUM_CHUNKS))) |
+ for i in range(chunk_size, len(issue_ids), chunk_size): |
+ issue = self.services.issue.GetAnyOnHandIssue( |
+ issue_ids, start=i, end=min(i + chunk_size, len(issue_ids))) |
+ if issue: |
+ sample_dict[issue.issue_id] = issue |
+ else: |
+ needed_iids.append(issue_ids[i]) |
+ |
+ def _LookupNeededUsers(self, issues): |
+ """Look up user info needed to sort issues, if any.""" |
+ with self.profiler.Phase('lookup of owner, reporter, and cc'): |
+ additional_user_views_by_id = ( |
+ tracker_helpers.MakeViewsForUsersInIssues( |
+ self.mr.cnxn, issues, self.services.user, |
+ omit_ids=self.users_by_id.keys())) |
+ self.users_by_id.update(additional_user_views_by_id) |
+ |
+ def Paginate(self): |
+ """Fetch matching issues and paginate the search results. |
+ |
+ These two actions are intertwined because we try to only |
+ retrieve the Issues on the current pagination page. |
+ """ |
+ if self.grid_mode: |
+ # We don't paginate the grid view. But, pagination object shows counts. |
+ self.pagination = paginate.ArtifactPagination( |
+ self.mr, self.allowed_results, self.default_results_per_page, |
+ total_count=self.total_count, list_page_url=urls.ISSUE_LIST) |
+ # We limited the results, but still show the original total count. |
+ self.visible_results = self.allowed_results |
+ |
+ else: |
+ # We already got the issues, just display a slice of the visible ones. |
+ limit_reached = False |
+ for shard_limit_reached in self.search_limit_reached.values(): |
+ limit_reached |= shard_limit_reached |
+ self.pagination = paginate.ArtifactPagination( |
+ self.mr, self.allowed_results, self.default_results_per_page, |
+ total_count=self.total_count, list_page_url=urls.ISSUE_LIST, |
+ limit_reached=limit_reached, skipped=self.num_skipped_at_start) |
+ self.visible_results = self.pagination.visible_results |
+ |
+ # If we were not forced to look up visible users already, do it now. |
+ if self.grid_mode: |
+ self._LookupNeededUsers(self.allowed_results) |
+ else: |
+ self._LookupNeededUsers(self.visible_results) |
+ |
+ def __repr__(self): |
+ """Return a string that shows the internal state of this pipeline.""" |
+ if self.allowed_iids: |
+ shown_allowed_iids = self.allowed_iids[:200] |
+ else: |
+ shown_allowed_iids = self.allowed_iids |
+ |
+ if self.allowed_results: |
+ shown_allowed_results = self.allowed_results[:200] |
+ else: |
+ shown_allowed_results = self.allowed_results |
+ |
+ parts = [ |
+ 'allowed_iids: %r' % shown_allowed_iids, |
+ 'allowed_results: %r' % shown_allowed_results, |
+ 'len(visible_results): %r' % ( |
+ self.visible_results and len(self.visible_results))] |
+ return '%s(%s)' % (self.__class__.__name__, '\n'.join(parts)) |
+ |
+ |
+def _MakeBackendCallback(func, *args): |
+ return lambda: func(*args) |
+ |
+ |
+def _StartBackendSearch( |
+ mr, query_project_names, query_project_ids, harmonized_config, |
+ unfiltered_iids_dict, search_limit_reached_dict, |
+ nonviewable_iids, error_responses, services): |
+ """Request that our backends search and return a list of matching issue IDs. |
+ |
+ Args: |
+ mr: commonly used info parsed from the request, including query and |
+ sort spec. |
+ query_project_names: set of project names to search. |
+ query_project_ids: list of project IDs to search. |
+ harmonized_config: combined ProjectIssueConfig for all projects being |
+ searched. |
+ unfiltered_iids_dict: dict {shard_id: [iid, ...]} of unfiltered search |
+ results to accumulate into. They need to be later filtered by |
+ permissions and merged into filtered_iids_dict. |
+ search_limit_reached_dict: dict{shard_id: [bool, ...]} to determine if |
+ the search limit of any shard was reached. |
+ nonviewable_iids: dict {shard_id: set(iid)} of restricted issues in the |
+ projects being searched that the signed in user cannot view. |
+ services: connections to backends. |
+ |
+ Returns: |
+ A list of rpc_tuples that can be passed to _FinishBackendSearch to wait |
+ on any remaining backend calls. |
+ |
+ SIDE-EFFECTS: |
+ Any data found in memcache is immediately put into unfiltered_iids_dict. |
+ As the backends finish their work, _HandleBackendSearchResponse will update |
+ unfiltered_iids_dict for those shards. |
+ """ |
+ rpc_tuples = [] |
+ needed_shard_ids = set(range(settings.num_logical_shards)) |
+ |
+ # 1. Get whatever we can from memcache. Cache hits are only kept if they are |
+ # not already expired. Each kept cache hit will have unfiltered IIDs, so we |
+ # need to get the at-risk IIDs to efficiently filter them based on perms. |
+ project_shard_timestamps = _GetProjectTimestamps( |
+ query_project_ids, needed_shard_ids) |
+ |
+ if mr.use_cached_searches: |
+ cached_unfiltered_iids_dict, cached_search_limit_reached_dict = ( |
+ _GetCachedSearchResults( |
+ mr, query_project_ids, needed_shard_ids, harmonized_config, |
+ project_shard_timestamps, services)) |
+ unfiltered_iids_dict.update(cached_unfiltered_iids_dict) |
+ search_limit_reached_dict.update(cached_search_limit_reached_dict) |
+ for cache_hit_shard_id in unfiltered_iids_dict: |
+ needed_shard_ids.remove(cache_hit_shard_id) |
+ |
+ _GetNonviewableIIDs( |
+ query_project_ids, mr.auth.user_id, set(range(settings.num_logical_shards)), |
+ rpc_tuples, nonviewable_iids, project_shard_timestamps, |
+ services.cache_manager.processed_invalidations_up_to, |
+ mr.use_cached_searches) |
+ |
+ # 2. Hit backends for any shards that are still needed. When these results |
+ # come back, they are also put into unfiltered_iids_dict.. |
+ for shard_id in needed_shard_ids: |
+ rpc = _StartBackendSearchCall( |
+ mr, query_project_names, shard_id, |
+ services.cache_manager.processed_invalidations_up_to) |
+ rpc_tuple = (time.time(), shard_id, rpc) |
+ rpc.callback = _MakeBackendCallback( |
+ _HandleBackendSearchResponse, mr, query_project_names, rpc_tuple, |
+ rpc_tuples, settings.backend_retries, unfiltered_iids_dict, |
+ search_limit_reached_dict, |
+ services.cache_manager.processed_invalidations_up_to, |
+ error_responses) |
+ rpc_tuples.append(rpc_tuple) |
+ |
+ return rpc_tuples |
+ |
+ |
+def _FinishBackendSearch(rpc_tuples): |
+ """Wait for all backend calls to complete, including any retries.""" |
+ while rpc_tuples: |
+ active_rpcs = [rpc for (_time, _shard_id, rpc) in rpc_tuples] |
+ # Wait for any active RPC to complete. It's callback function will |
+ # automatically be called. |
+ finished_rpc = apiproxy_stub_map.UserRPC.wait_any(active_rpcs) |
+ # Figure out which rpc_tuple finished and remove it from our list. |
+ for rpc_tuple in rpc_tuples: |
+ _time, _shard_id, rpc = rpc_tuple |
+ if rpc == finished_rpc: |
+ rpc_tuples.remove(rpc_tuple) |
+ break |
+ else: |
+ raise ValueError('We somehow finished an RPC that is not in rpc_tuples') |
+ |
+ |
+def _GetProjectTimestamps(query_project_ids, needed_shard_ids): |
+ """Get a dict of modified_ts values for all specified project-shards.""" |
+ project_shard_timestamps = {} |
+ if query_project_ids: |
+ keys = [] |
+ for pid in query_project_ids: |
+ for sid in needed_shard_ids: |
+ keys.append('%d;%d' % (pid, sid)) |
+ else: |
+ keys = [('all;%d' % sid) for sid in needed_shard_ids] |
+ |
+ timestamps_for_project = memcache.get_multi(keys=keys) |
+ for key, timestamp in timestamps_for_project.iteritems(): |
+ pid_str, sid_str = key.split(';') |
+ if pid_str == 'all': |
+ project_shard_timestamps['all', int(sid_str)] = timestamp |
+ else: |
+ project_shard_timestamps[int(pid_str), int(sid_str)] = timestamp |
+ |
+ return project_shard_timestamps |
+ |
+ |
+def _GetNonviewableIIDs( |
+ query_project_ids, logged_in_user_id, needed_shard_ids, rpc_tuples, |
+ nonviewable_iids, project_shard_timestamps, invalidation_timestep, |
+ use_cached_searches): |
+ """Build a set of at-risk IIDs, and accumulate RPCs to get uncached ones.""" |
+ if query_project_ids: |
+ keys = [] |
+ for pid in query_project_ids: |
+ for sid in needed_shard_ids: |
+ keys.append('%d;%d;%d' % (pid, logged_in_user_id, sid)) |
+ else: |
+ keys = [('all;%d;%d' % sid) |
+ for (logged_in_user_id, sid) in needed_shard_ids] |
+ |
+ if use_cached_searches: |
+ cached_dict = memcache.get_multi(keys, key_prefix='nonviewable:') |
+ else: |
+ cached_dict = {} |
+ |
+ for sid in needed_shard_ids: |
+ if query_project_ids: |
+ for pid in query_project_ids: |
+ _AccumulateNonviewableIIDs( |
+ pid, logged_in_user_id, sid, cached_dict, nonviewable_iids, |
+ project_shard_timestamps, rpc_tuples, invalidation_timestep) |
+ else: |
+ _AccumulateNonviewableIIDs( |
+ None, logged_in_user_id, sid, cached_dict, nonviewable_iids, |
+ project_shard_timestamps, rpc_tuples, invalidation_timestep) |
+ |
+ |
+def _AccumulateNonviewableIIDs( |
+ pid, logged_in_user_id, sid, cached_dict, nonviewable_iids, |
+ project_shard_timestamps, rpc_tuples, invalidation_timestep): |
+ """Use one of the retrieved cache entries or call a backend if needed.""" |
+ if pid is None: |
+ key = 'all;%d;%d' % (logged_in_user_id, sid) |
+ else: |
+ key = '%d;%d;%d' % (pid, logged_in_user_id, sid) |
+ |
+ if key in cached_dict: |
+ issue_ids, cached_ts = cached_dict.get(key) |
+ modified_ts = project_shard_timestamps.get((pid, sid)) |
+ if modified_ts is None or modified_ts > cached_ts: |
+ logging.info('nonviewable too stale on (project %r, shard %r)', |
+ pid, sid) |
+ else: |
+ logging.info('adding %d nonviewable issue_ids', len(issue_ids)) |
+ nonviewable_iids[sid] = set(issue_ids) |
+ |
+ if sid not in nonviewable_iids: |
+ logging.info('nonviewable for %r not found', key) |
+ logging.info('starting backend call for nonviewable iids %r', key) |
+ rpc = _StartBackendNonviewableCall( |
+ pid, logged_in_user_id, sid, invalidation_timestep) |
+ rpc_tuple = (time.time(), sid, rpc) |
+ rpc.callback = _MakeBackendCallback( |
+ _HandleBackendNonviewableResponse, pid, logged_in_user_id, sid, |
+ rpc_tuple, rpc_tuples, settings.backend_retries, nonviewable_iids, |
+ invalidation_timestep) |
+ rpc_tuples.append(rpc_tuple) |
+ |
+ |
+def _GetCachedSearchResults( |
+ mr, query_project_ids, needed_shard_ids, harmonized_config, |
+ project_shard_timestamps, services): |
+ """Return a dict of cached search results that are not already stale. |
+ |
+ If it were not for cross-project search, we would simply cache when we do a |
+ search and then invalidate when an issue is modified. But, with |
+ cross-project search we don't know all the memcache entries that would |
+ need to be invalidated. So, instead, we write the search result cache |
+ entries and then an initial modified_ts value for each project if it was |
+ not already there. And, when we update an issue we write a new |
+ modified_ts entry, which implicitly invalidate all search result |
+ cache entries that were written earlier because they are now stale. When |
+ reading from the cache, we ignore any query project with modified_ts |
+ after its search result cache timestamp, because it is stale. |
+ |
+ Args: |
+ mr: common information parsed from the request. |
+ query_project_ids: list of project ID numbers for all projects being |
+ searched. |
+ needed_shard_ids: set of shard IDs that need to be checked. |
+ harmonized_config: ProjectIsueConfig with combined information for all |
+ projects involved in this search. |
+ project_shard_timestamps: a dict {(project_id, shard_id): timestamp, ...} |
+ that tells when each shard was last invalidated. |
+ services: connections to backends. |
+ |
+ Returns: |
+ Tuple consisting of: |
+ A dictionary {shard_id: [issue_id, ...], ...} of unfiltered search result |
+ issue IDs. Only shard_ids found in memcache will be in that dictionary. |
+ The result issue IDs must be permission checked before they can be |
+ considered to be part of the user's result set. |
+ A dictionary {shard_id: bool, ...}. The boolean is set to True if |
+ the search results limit of the shard is hit. |
+ """ |
+ projects_str = ','.join(str(pid) for pid in sorted(query_project_ids)) |
+ projects_str = projects_str or 'all' |
+ canned_query = savedqueries_helpers.SavedQueryIDToCond( |
+ mr.cnxn, services.features, mr.can) |
+ logging.info('canned query is %r', canned_query) |
+ canned_query = searchpipeline.ReplaceKeywordsWithUserID( |
+ mr.me_user_id, canned_query) |
+ user_query = searchpipeline.ReplaceKeywordsWithUserID( |
+ mr.me_user_id, mr.query) |
+ |
+ sd = sorting.ComputeSortDirectives(mr, harmonized_config) |
+ memcache_prefix = ';'.join([projects_str, canned_query, user_query, |
+ ' '.join(sd), '']) |
+ cached_dict = memcache.get_multi( |
+ [str(sid) for sid in needed_shard_ids], key_prefix=memcache_prefix) |
+ search_limit_memcache_prefix = ';'.join( |
+ [projects_str, canned_query, user_query, |
+ ' '.join(sd), 'search_limit_reached', '']) |
+ cached_search_limit_reached_dict = memcache.get_multi( |
+ [str(sid) for sid in needed_shard_ids], |
+ key_prefix=search_limit_memcache_prefix) |
+ |
+ unfiltered_dict = {} |
+ search_limit_reached_dict = {} |
+ for shard_id in needed_shard_ids: |
+ if str(shard_id) not in cached_dict: |
+ logging.info('memcache miss on shard %r', shard_id) |
+ continue |
+ |
+ cached_iids, cached_ts = cached_dict[str(shard_id)] |
+ if cached_search_limit_reached_dict.get(str(shard_id)): |
+ search_limit_reached, _ = cached_search_limit_reached_dict[str(shard_id)] |
+ else: |
+ search_limit_reached = False |
+ |
+ stale = False |
+ if query_project_ids: |
+ for project_id in query_project_ids: |
+ modified_ts = project_shard_timestamps.get((project_id, shard_id)) |
+ if modified_ts is None or modified_ts > cached_ts: |
+ stale = True |
+ logging.info('memcache too stale on shard %r because of %r', |
+ shard_id, project_id) |
+ break |
+ else: |
+ modified_ts = project_shard_timestamps.get(('all', shard_id)) |
+ if modified_ts is None or modified_ts > cached_ts: |
+ stale = True |
+ logging.info('memcache too stale on shard %r because of all', |
+ shard_id) |
+ |
+ if not stale: |
+ logging.info('memcache hit on %r', shard_id) |
+ unfiltered_dict[shard_id] = cached_iids |
+ search_limit_reached_dict[shard_id] = search_limit_reached |
+ |
+ return unfiltered_dict, search_limit_reached_dict |
+ |
+ |
+def _MakeBackendRequestHeaders(failfast): |
+ headers = { |
+ # This is needed to allow frontends to talk to backends without going |
+ # through a login screen on googleplex.com. |
+ # http://wiki/Main/PrometheusInternal#Internal_Applications_and_APIs |
+ 'X-URLFetch-Service-Id': 'GOOGLEPLEX', |
+ } |
+ if failfast: |
+ headers['X-AppEngine-FailFast'] = 'Yes' |
+ return headers |
+ |
+ |
+def _StartBackendSearchCall( |
+ mr, query_project_names, shard_id, invalidation_timestep, |
+ deadline=None, failfast=True): |
+ """Ask a backend to query one shard of the database.""" |
+ backend_host = modules.get_hostname(module='besearch') |
+ url = 'http://%s%s' % (backend_host, framework_helpers.FormatURL( |
+ mr, urls.BACKEND_SEARCH, |
+ skip_filtering=True, # TODO(jrobbins): remove after next release. |
+ projects=','.join(query_project_names), |
+ start=0, num=mr.start + mr.num, |
+ logged_in_user_id=mr.auth.user_id or 0, |
+ me_user_id=mr.me_user_id, shard_id=shard_id, |
+ invalidation_timestep=invalidation_timestep)) |
+ logging.info('\n\nCalling backend: %s', url) |
+ rpc = urlfetch.create_rpc( |
+ deadline=deadline or settings.backend_deadline) |
+ headers = _MakeBackendRequestHeaders(failfast) |
+ # follow_redirects=False is needed to avoid a login screen on googleplex. |
+ urlfetch.make_fetch_call(rpc, url, follow_redirects=False, headers=headers) |
+ return rpc |
+ |
+ |
+def _StartBackendNonviewableCall( |
+ project_id, logged_in_user_id, shard_id, invalidation_timestep, |
+ deadline=None, failfast=True): |
+ """Ask a backend to query one shard of the database.""" |
+ backend_host = modules.get_hostname(module='besearch') |
+ url = 'http://%s%s' % (backend_host, framework_helpers.FormatURL( |
+ None, urls.BACKEND_NONVIEWABLE, |
+ project_id=project_id or '', |
+ logged_in_user_id=logged_in_user_id or '', |
+ shard_id=shard_id, |
+ invalidation_timestep=invalidation_timestep)) |
+ logging.info('Calling backend nonviewable: %s', url) |
+ rpc = urlfetch.create_rpc(deadline=deadline or settings.backend_deadline) |
+ headers = _MakeBackendRequestHeaders(failfast) |
+ # follow_redirects=False is needed to avoid a login screen on googleplex. |
+ urlfetch.make_fetch_call(rpc, url, follow_redirects=False, headers=headers) |
+ return rpc |
+ |
+ |
+def _HandleBackendSearchResponse( |
+ mr, query_project_names, rpc_tuple, rpc_tuples, remaining_retries, |
+ unfiltered_iids, search_limit_reached, invalidation_timestep, |
+ error_responses): |
+ """Process one backend response and retry if there was an error.""" |
+ start_time, shard_id, rpc = rpc_tuple |
+ duration_sec = time.time() - start_time |
+ |
+ try: |
+ response = rpc.get_result() |
+ logging.info('call to backend took %d sec', duration_sec) |
+ # Note that response.content has "})]'\n" prepended to it. |
+ json_content = response.content[5:] |
+ logging.info('got json text: %r length %r', |
+ json_content[:framework_constants.LOGGING_MAX_LENGTH], |
+ len(json_content)) |
+ json_data = json.loads(json_content) |
+ unfiltered_iids[shard_id] = json_data['unfiltered_iids'] |
+ search_limit_reached[shard_id] = json_data['search_limit_reached'] |
+ |
+ except Exception as e: |
+ if duration_sec > FAIL_FAST_LIMIT_SEC: # Don't log fail-fast exceptions. |
+ logging.exception(e) |
+ if not remaining_retries: |
+ logging.error('backend search retries exceeded') |
+ error_responses.add(shard_id) |
+ return # Used all retries, so give up. |
+ |
+ if duration_sec >= settings.backend_deadline: |
+ logging.error('backend search on %r took too long', shard_id) |
+ error_responses.add(shard_id) |
+ return # That backend shard is overloaded, so give up. |
+ |
+ logging.error('backend call for shard %r failed, retrying', shard_id) |
+ retry_rpc = _StartBackendSearchCall( |
+ mr, query_project_names, shard_id, invalidation_timestep, |
+ failfast=remaining_retries > 2) |
+ retry_rpc_tuple = (time.time(), shard_id, retry_rpc) |
+ retry_rpc.callback = _MakeBackendCallback( |
+ _HandleBackendSearchResponse, mr, query_project_names, |
+ retry_rpc_tuple, rpc_tuples, remaining_retries - 1, unfiltered_iids, |
+ search_limit_reached, invalidation_timestep, error_responses) |
+ rpc_tuples.append(retry_rpc_tuple) |
+ |
+ |
+def _HandleBackendNonviewableResponse( |
+ project_id, logged_in_user_id, shard_id, rpc_tuple, rpc_tuples, |
+ remaining_retries, nonviewable_iids, invalidation_timestep): |
+ """Process one backend response and retry if there was an error.""" |
+ start_time, shard_id, rpc = rpc_tuple |
+ duration_sec = time.time() - start_time |
+ |
+ try: |
+ response = rpc.get_result() |
+ logging.info('call to backend nonviewable took %d sec', duration_sec) |
+ # Note that response.content has "})]'\n" prepended to it. |
+ json_content = response.content[5:] |
+ logging.info('got json text: %r length %r', |
+ json_content[:framework_constants.LOGGING_MAX_LENGTH], |
+ len(json_content)) |
+ json_data = json.loads(json_content) |
+ nonviewable_iids[shard_id] = set(json_data['nonviewable']) |
+ |
+ except Exception as e: |
+ if duration_sec > FAIL_FAST_LIMIT_SEC: # Don't log fail-fast exceptions. |
+ logging.exception(e) |
+ |
+ if not remaining_retries: |
+ logging.warn('Used all retries, so give up on shard %r', shard_id) |
+ return |
+ |
+ if duration_sec >= settings.backend_deadline: |
+ logging.error('nonviewable call on %r took too long', shard_id) |
+ return # That backend shard is overloaded, so give up. |
+ |
+ logging.error( |
+ 'backend nonviewable call for shard %r;%r;%r failed, retrying', |
+ project_id, logged_in_user_id, shard_id) |
+ retry_rpc = _StartBackendNonviewableCall( |
+ project_id, logged_in_user_id, shard_id, invalidation_timestep, |
+ failfast=remaining_retries > 2) |
+ retry_rpc_tuple = (time.time(), shard_id, retry_rpc) |
+ retry_rpc.callback = _MakeBackendCallback( |
+ _HandleBackendNonviewableResponse, project_id, logged_in_user_id, |
+ shard_id, retry_rpc_tuple, rpc_tuples, remaining_retries - 1, |
+ nonviewable_iids, invalidation_timestep) |
+ rpc_tuples.append(retry_rpc_tuple) |
+ |
+ |
+def _TotalLength(sharded_iids): |
+ """Return the total length of all issue_iids lists.""" |
+ return sum(len(issue_iids) for issue_iids in sharded_iids.itervalues()) |
+ |
+ |
+def _ReverseShards(sharded_iids): |
+ """Reverse each issue_iids list in place.""" |
+ for shard_id in sharded_iids: |
+ sharded_iids[shard_id].reverse() |
+ |
+ |
+def _TrimEndShardedIIDs(sharded_iids, sample_iids, num_needed): |
+ """Trim the IIDs to keep at least num_needed items. |
+ |
+ Args: |
+ sharded_iids: dict {shard_id: issue_id_list} for search results. This is |
+ modified in place to remove some trailing issue IDs. |
+ sample_iids: list of IIDs from a sorted list of sample issues. |
+ num_needed: int minimum total number of items to keep. Some IIDs that are |
+ known to belong in positions > num_needed will be trimmed off. |
+ |
+ Returns: |
+ The total number of IIDs removed from the IID lists. |
+ """ |
+ # 1. Get (sample_iid, position_in_shard) for each sample. |
+ sample_positions = _CalcSamplePositions(sharded_iids, sample_iids) |
+ |
+ # 2. Walk through the samples, computing a combined lower bound at each |
+ # step until we know that we have passed at least num_needed IIDs. |
+ lower_bound_per_shard = {} |
+ excess_samples = [] |
+ for i in range(len(sample_positions)): |
+ sample_iid, pos = sample_positions[i] |
+ shard_id = sample_iid % settings.num_logical_shards |
+ lower_bound_per_shard[shard_id] = pos |
+ overall_lower_bound = sum(lower_bound_per_shard.itervalues()) |
+ if overall_lower_bound >= num_needed: |
+ excess_samples = sample_positions[i + 1:] |
+ break |
+ else: |
+ return 0 # We went through all samples and never reached num_needed. |
+ |
+ # 3. Truncate each shard at the first excess sample in that shard. |
+ already_trimmed = set() |
+ num_trimmed = 0 |
+ for sample_iid, pos in excess_samples: |
+ shard_id = sample_iid % settings.num_logical_shards |
+ if shard_id not in already_trimmed: |
+ num_trimmed += len(sharded_iids[shard_id]) - pos |
+ sharded_iids[shard_id] = sharded_iids[shard_id][:pos] |
+ already_trimmed.add(shard_id) |
+ |
+ return num_trimmed |
+ |
+ |
+# TODO(jrobbins): Convert this to a python generator. |
+def _CalcSamplePositions(sharded_iids, sample_iids): |
+ """Return [(sample_iid, position_in_shard), ...] for each sample.""" |
+ # We keep track of how far index() has scanned in each shard to avoid |
+ # starting over at position 0 when looking for the next sample in |
+ # the same shard. |
+ scan_positions = collections.defaultdict(lambda: 0) |
+ sample_positions = [] |
+ for sample_iid in sample_iids: |
+ shard_id = sample_iid % settings.num_logical_shards |
+ try: |
+ pos = sharded_iids.get(shard_id, []).index( |
+ sample_iid, scan_positions[shard_id]) |
+ scan_positions[shard_id] = pos |
+ sample_positions.append((sample_iid, pos)) |
+ except ValueError: |
+ pass |
+ |
+ return sample_positions |
+ |
+ |
+def _SortIssues(mr, issues, config, users_by_id): |
+ """Sort the found issues based on the request and config values. |
+ |
+ Args: |
+ mr: common information parsed from the HTTP request. |
+ issues: A list of issues to be sorted. |
+ config: A ProjectIssueConfig that could impact sort order. |
+ users_by_id: dictionary {user_id: user_view,...} for all users who |
+ participate in any issue in the entire list. |
+ |
+ Returns: |
+ A sorted list of issues, based on parameters from mr and config. |
+ """ |
+ issues = sorting.SortArtifacts( |
+ mr, issues, config, tracker_helpers.SORTABLE_FIELDS, |
+ username_cols=tracker_constants.USERNAME_COLS, users_by_id=users_by_id) |
+ return issues |