| Index: appengine/monorail/search/frontendsearchpipeline.py | 
| diff --git a/appengine/monorail/search/frontendsearchpipeline.py b/appengine/monorail/search/frontendsearchpipeline.py | 
| new file mode 100644 | 
| index 0000000000000000000000000000000000000000..9bc74b1356dc01414d962b2acb6dcfee2d501458 | 
| --- /dev/null | 
| +++ b/appengine/monorail/search/frontendsearchpipeline.py | 
| @@ -0,0 +1,935 @@ | 
| +# Copyright 2016 The Chromium Authors. All rights reserved. | 
| +# Use of this source code is govered by a BSD-style | 
| +# license that can be found in the LICENSE file or at | 
| +# https://developers.google.com/open-source/licenses/bsd | 
| + | 
| +"""The FrontendSearchPipeline class manages issue search and sorting. | 
| + | 
| +The frontend pipeline checks memcache for cached results in each shard.  It | 
| +then calls backend jobs to do any shards that had a cache miss.  On cache hit, | 
| +the cached results must be filtered by permissions, so the at-risk cache and | 
| +backends are consulted.  Next, the sharded results are combined into an overall | 
| +list of IIDs.  Then, that list is paginated and the issues on the current | 
| +pagination page can be shown.  Alternatively, this class can determine just the | 
| +position the currently shown issue would occupy in the overall sorted list. | 
| +""" | 
| + | 
| +import json | 
| + | 
| +import collections | 
| +import logging | 
| +import math | 
| +import random | 
| +import time | 
| + | 
| +from google.appengine.api import apiproxy_stub_map | 
| +from google.appengine.api import memcache | 
| +from google.appengine.api import modules | 
| +from google.appengine.api import urlfetch | 
| + | 
| +import settings | 
| +from features import savedqueries_helpers | 
| +from framework import framework_constants | 
| +from framework import framework_helpers | 
| +from framework import paginate | 
| +from framework import sorting | 
| +from framework import urls | 
| +from search import query2ast | 
| +from search import searchpipeline | 
| +from services import fulltext_helpers | 
| +from tracker import tracker_bizobj | 
| +from tracker import tracker_constants | 
| +from tracker import tracker_helpers | 
| + | 
| + | 
| +# Fail-fast responses usually finish in less than 50ms.  If we see a failure | 
| +# in under that amount of time, we don't bother logging it. | 
| +FAIL_FAST_LIMIT_SEC = 0.050 | 
| + | 
| +# The choices help balance the cost of choosing samples vs. the cost of | 
| +# selecting issues that are in a range bounded by neighboring samples. | 
| +# Preferred chunk size parameters were determined by experimentation. | 
| +MIN_SAMPLE_CHUNK_SIZE = int( | 
| +    math.sqrt(tracker_constants.DEFAULT_RESULTS_PER_PAGE)) | 
| +MAX_SAMPLE_CHUNK_SIZE = int(math.sqrt(settings.search_limit_per_shard)) | 
| +PREFERRED_NUM_CHUNKS = 50 | 
| + | 
| + | 
| +class FrontendSearchPipeline(object): | 
| +  """Manage the process of issue search, including backends and caching. | 
| + | 
| +  Even though the code is divided into several methods, the public | 
| +  methods should be called in sequence, so the execution of the code | 
| +  is pretty much in the order of the source code lines here. | 
| +  """ | 
| + | 
| +  def __init__(self, mr, services, prof, default_results_per_page): | 
| +    self.mr = mr | 
| +    self.services = services | 
| +    self.profiler = prof | 
| +    self.default_results_per_page = default_results_per_page | 
| +    self.grid_mode = (mr.mode == 'grid') | 
| +    self.grid_limited = False | 
| +    self.pagination = None | 
| +    self.num_skipped_at_start = 0 | 
| +    self.total_count = 0 | 
| + | 
| +    self.query_project_names = set() | 
| +    if mr.query_project_names: | 
| +      self.query_project_names.update(mr.query_project_names) | 
| + | 
| +    projects = services.project.GetProjectsByName( | 
| +        mr.cnxn, self.query_project_names).values() | 
| +    self.query_project_ids = [p.project_id for p in projects] | 
| +    if mr.project_name: | 
| +      self.query_project_ids.append(mr.project_id) | 
| +      self.query_project_names.add(mr.project_name) | 
| + | 
| +    config_dict = self.services.config.GetProjectConfigs( | 
| +        mr.cnxn, self.query_project_ids) | 
| +    self.harmonized_config = tracker_bizobj.HarmonizeConfigs( | 
| +        config_dict.values()) | 
| + | 
| +    # The following fields are filled in as the pipeline progresses. | 
| +    # The value None means that we still need to compute that value. | 
| +    self.users_by_id = {} | 
| +    self.nonviewable_iids = {}  # {shard_id: set(iid)} | 
| +    self.unfiltered_iids = {}  # {shard_id: [iid, ...]} needing perm checks. | 
| +    self.filtered_iids = {}  # {shard_id: [iid, ...]} already perm checked. | 
| +    self.search_limit_reached = {}  # {shard_id: [bool, ...]}. | 
| +    self.counts = {} | 
| +    self.allowed_iids = []  # Matching iids that user is permitted to view. | 
| +    self.allowed_results = None  # results that the user is permitted to view. | 
| +    self.visible_results = None  # allowed_results on current pagination page. | 
| +    self.error_responses = set() | 
| + | 
| +    # Projects that contain the result issues.  This starts off as a dict of | 
| +    # all the query projects, but it can grow based on the found issues in the | 
| +    # case where the user is searching across the entire site. | 
| +    self.issue_projects = {p.project_id: p for p in projects} | 
| + | 
| +    error_msg = query2ast.CheckSyntax( | 
| +        self.mr.query, self.harmonized_config, warnings=self.mr.warnings) | 
| +    if error_msg: | 
| +      self.mr.errors.query = error_msg | 
| + | 
| +  def SearchForIIDs(self): | 
| +    """Use backends to search each shard and store their results.""" | 
| +    with self.profiler.Phase('Checking cache and calling Backends'): | 
| +      rpc_tuples = _StartBackendSearch( | 
| +          self.mr, self.query_project_names, self.query_project_ids, | 
| +          self.harmonized_config, self.unfiltered_iids, | 
| +          self.search_limit_reached, self.nonviewable_iids, | 
| +          self.error_responses, self.services) | 
| + | 
| +    with self.profiler.Phase('Waiting for Backends'): | 
| +      try: | 
| +        _FinishBackendSearch(rpc_tuples) | 
| +      except Exception as e: | 
| +        logging.exception(e) | 
| +        raise | 
| + | 
| +    if self.error_responses: | 
| +      logging.error('%r error responses. Incomplete search results.', | 
| +                    self.error_responses) | 
| + | 
| +    with self.profiler.Phase('Filtering cached results'): | 
| +      for shard_id in self.unfiltered_iids: | 
| +        if shard_id not in self.nonviewable_iids: | 
| +          logging.error( | 
| +            'Not displaying shard %r because of no nonviewable_iids', shard_id) | 
| +          self.error_responses.add(shard_id) | 
| +          filtered_shard_iids = [] | 
| +        else: | 
| +          unfiltered_shard_iids = self.unfiltered_iids[shard_id] | 
| +          nonviewable_shard_iids = self.nonviewable_iids[shard_id] | 
| +          # TODO(jrobbins): avoid creating large temporary lists. | 
| +          filtered_shard_iids = [iid for iid in unfiltered_shard_iids | 
| +                                 if iid not in nonviewable_shard_iids] | 
| +        if self.grid_mode: | 
| +          self.filtered_iids[shard_id] = filtered_shard_iids | 
| +        else: | 
| +          self.filtered_iids[shard_id] = filtered_shard_iids[ | 
| +              :self.mr.start + self.mr.num] | 
| +        self.counts[shard_id] = len(filtered_shard_iids) | 
| + | 
| +    with self.profiler.Phase('Counting all filtered results'): | 
| +      self.total_count = sum(self.counts.itervalues()) | 
| + | 
| +  def MergeAndSortIssues(self): | 
| +    """Merge and sort results from all shards into one combined list.""" | 
| +    with self.profiler.Phase('selecting issues to merge and sort'): | 
| +      if not self.grid_mode: | 
| +        self._NarrowFilteredIIDs() | 
| +      self.allowed_iids = [] | 
| +      for filtered_shard_iids in self.filtered_iids.itervalues(): | 
| +        self.allowed_iids.extend(filtered_shard_iids) | 
| + | 
| +    # The grid view is not paginated, so limit the results shown to avoid | 
| +    # generating a HTML page that would be too large. | 
| +    limit = settings.max_issues_in_grid | 
| +    if self.grid_mode and len(self.allowed_iids) > limit: | 
| +      self.grid_limited = True | 
| +      self.allowed_iids = self.allowed_iids[:limit] | 
| + | 
| +    with self.profiler.Phase('getting allowed results'): | 
| +      self.allowed_results = self.services.issue.GetIssues( | 
| +          self.mr.cnxn, self.allowed_iids) | 
| + | 
| +    # Note: At this point, we have results that are only sorted within | 
| +    # each backend's shard. We still need to sort the merged result. | 
| +    self._LookupNeededUsers(self.allowed_results) | 
| +    with self.profiler.Phase('merging and sorting issues'): | 
| +      self.allowed_results = _SortIssues( | 
| +          self.mr, self.allowed_results, self.harmonized_config, | 
| +          self.users_by_id) | 
| + | 
| +  def _NarrowFilteredIIDs(self): | 
| +    """Combine filtered shards into a range of IIDs for issues to sort. | 
| + | 
| +    The niave way is to concatenate shard_iids[:start + num] for all | 
| +    shards then select [start:start + num].  We do better by sampling | 
| +    issues and then determining which of those samples are known to | 
| +    come before start or after start+num.  We then trim off all those IIDs | 
| +    and sort a smaller range of IIDs that might actuall be displayed. | 
| +    See the design doc at go/monorail-sorting. | 
| + | 
| +    This method modifies self.fitered_iids and self.num_skipped_at_start. | 
| +    """ | 
| +    # Sample issues and skip those that are known to come before start. | 
| +    # See the "Sorting in Monorail" design doc. | 
| + | 
| +    # If the result set is small, don't bother optimizing it. | 
| +    orig_length = _TotalLength(self.filtered_iids) | 
| +    if orig_length < self.mr.num * 4: | 
| +      return | 
| + | 
| +    # 1. Get sample issues in each shard and sort them all together. | 
| +    last = self.mr.start + self.mr.num | 
| +    on_hand_samples = {} | 
| +    needed_iids = [] | 
| +    for shard_id in self.filtered_iids: | 
| +      self._AccumulateSampleIssues( | 
| +          self.filtered_iids[shard_id], on_hand_samples, needed_iids) | 
| +    retrieved_samples = self.services.issue.GetIssuesDict( | 
| +        self.mr.cnxn, needed_iids) | 
| +    sample_issues = on_hand_samples.values() + retrieved_samples.values() | 
| +    self._LookupNeededUsers(sample_issues) | 
| +    sample_issues = _SortIssues( | 
| +        self.mr, sample_issues, self.harmonized_config, self.users_by_id) | 
| +    sample_iids = [issue.issue_id for issue in sample_issues] | 
| + | 
| +    # 2. Trim off some IIDs that are sure to be positioned after last. | 
| +    num_trimmed_end = _TrimEndShardedIIDs(self.filtered_iids, sample_iids, last) | 
| +    logging.info('Trimmed %r issues from the end of shards', num_trimmed_end) | 
| + | 
| +    # 3. Trim off some IIDs that are sure to be posiitoned before start. | 
| +    keep = _TotalLength(self.filtered_iids) - self.mr.start | 
| +    # Reverse the sharded lists. | 
| +    _ReverseShards(self.filtered_iids) | 
| +    sample_iids.reverse() | 
| +    self.num_skipped_at_start = _TrimEndShardedIIDs( | 
| +        self.filtered_iids, sample_iids, keep) | 
| +    logging.info('Trimmed %r issues from the start of shards', | 
| +                 self.num_skipped_at_start) | 
| +    # Reverse sharded lists again to get back into forward order. | 
| +    _ReverseShards(self.filtered_iids) | 
| + | 
| +  def DetermineIssuePosition(self, issue): | 
| +    """Calculate info needed to show the issue flipper. | 
| + | 
| +    Args: | 
| +      issue: The issue currently being viewed. | 
| + | 
| +    Returns: | 
| +      A 3-tuple (prev_iid, index, next_iid) were prev_iid is the | 
| +      IID of the previous issue in the total ordering (or None), | 
| +      index is the index that the current issue has in the total | 
| +      ordering, and next_iid is the next issue (or None).  If the current | 
| +      issue is not in the list of results at all, returns None, None, None. | 
| +    """ | 
| +    # 1. If the current issue is not in the results at all, then exit. | 
| +    if not any(issue.issue_id in filtered_shard_iids | 
| +               for filtered_shard_iids in self.filtered_iids.itervalues()): | 
| +      return None, None, None | 
| + | 
| +    # 2. Choose and retrieve sample issues in each shard. | 
| +    samples_by_shard = {}  # {shard_id: {iid: issue}} | 
| +    needed_iids = [] | 
| +    for shard_id in self.filtered_iids: | 
| +      samples_by_shard[shard_id] = {} | 
| +      self._AccumulateSampleIssues( | 
| +          self.filtered_iids[shard_id], samples_by_shard[shard_id], needed_iids) | 
| +    retrieved_samples = self.services.issue.GetIssuesDict( | 
| +        self.mr.cnxn, needed_iids) | 
| +    for retrieved_iid, retrieved_issue in retrieved_samples.iteritems(): | 
| +      shard_id = retrieved_iid % settings.num_logical_shards | 
| +      samples_by_shard[shard_id][retrieved_iid] = retrieved_issue | 
| + | 
| +    # 3. Build up partial results for each shard. | 
| +    preceeding_counts = {}  # dict {shard_id: num_issues_preceeding_current} | 
| +    prev_candidates, next_candidates = [], [] | 
| +    for shard_id in self.filtered_iids: | 
| +      prev_candidate, index_in_shard, next_candidate = ( | 
| +          self._DetermineIssuePositionInShard( | 
| +              shard_id, issue, samples_by_shard[shard_id])) | 
| +      preceeding_counts[shard_id] = index_in_shard | 
| +      if prev_candidate: | 
| +        prev_candidates.append(prev_candidate) | 
| +      if next_candidate: | 
| +        next_candidates.append(next_candidate) | 
| + | 
| +    # 4. Combine the results. | 
| +    index = sum(preceeding_counts.itervalues()) | 
| +    prev_candidates = _SortIssues( | 
| +        self.mr, prev_candidates, self.harmonized_config, self.users_by_id) | 
| +    prev_iid = prev_candidates[-1].issue_id if prev_candidates else None | 
| +    next_candidates = _SortIssues( | 
| +        self.mr, next_candidates, self.harmonized_config, self.users_by_id) | 
| +    next_iid = next_candidates[0].issue_id if next_candidates else None | 
| + | 
| +    return prev_iid, index, next_iid | 
| + | 
| +  def _DetermineIssuePositionInShard(self, shard_id, issue, sample_dict): | 
| +    """Determine where the given issue would fit into results from a shard.""" | 
| +    # See the design doc for details.  Basically, it first surveys the results | 
| +    # to bound a range where the given issue would belong, then it fetches the | 
| +    # issues in that range and sorts them. | 
| + | 
| +    filtered_shard_iids = self.filtered_iids[shard_id] | 
| + | 
| +    # 1. Select a sample of issues, leveraging ones we have in RAM already. | 
| +    issues_on_hand = sample_dict.values() | 
| +    if issue.issue_id not in sample_dict: | 
| +      issues_on_hand.append(issue) | 
| + | 
| +    self._LookupNeededUsers(issues_on_hand) | 
| +    sorted_on_hand = _SortIssues( | 
| +        self.mr, issues_on_hand, self.harmonized_config, self.users_by_id) | 
| +    sorted_on_hand_iids = [soh.issue_id for soh in sorted_on_hand] | 
| +    index_in_on_hand = sorted_on_hand_iids.index(issue.issue_id) | 
| + | 
| +    # 2. Bound the gap around where issue belongs. | 
| +    if index_in_on_hand == 0: | 
| +      fetch_start = 0 | 
| +    else: | 
| +      prev_on_hand_iid = sorted_on_hand_iids[index_in_on_hand - 1] | 
| +      fetch_start = filtered_shard_iids.index(prev_on_hand_iid) + 1 | 
| + | 
| +    if index_in_on_hand == len(sorted_on_hand) - 1: | 
| +      fetch_end = len(filtered_shard_iids) | 
| +    else: | 
| +      next_on_hand_iid = sorted_on_hand_iids[index_in_on_hand + 1] | 
| +      fetch_end = filtered_shard_iids.index(next_on_hand_iid) | 
| + | 
| +    # 3. Retrieve all the issues in that gap to get an exact answer. | 
| +    fetched_issues = self.services.issue.GetIssues( | 
| +        self.mr.cnxn, filtered_shard_iids[fetch_start:fetch_end]) | 
| +    if issue.issue_id not in filtered_shard_iids[fetch_start:fetch_end]: | 
| +      fetched_issues.append(issue) | 
| +    self._LookupNeededUsers(fetched_issues) | 
| +    sorted_fetched = _SortIssues( | 
| +        self.mr, fetched_issues, self.harmonized_config, self.users_by_id) | 
| +    sorted_fetched_iids = [sf.issue_id for sf in sorted_fetched] | 
| +    index_in_fetched = sorted_fetched_iids.index(issue.issue_id) | 
| + | 
| +    # 4. Find the issues that come immediately before and after the place where | 
| +    # the given issue would belong in this shard. | 
| +    if index_in_fetched > 0: | 
| +      prev_candidate = sorted_fetched[index_in_fetched - 1] | 
| +    elif index_in_on_hand > 0: | 
| +      prev_candidate = sorted_on_hand[index_in_on_hand - 1] | 
| +    else: | 
| +      prev_candidate = None | 
| + | 
| +    if index_in_fetched < len(sorted_fetched) - 1: | 
| +      next_candidate = sorted_fetched[index_in_fetched + 1] | 
| +    elif index_in_on_hand < len(sorted_on_hand) - 1: | 
| +      next_candidate = sorted_on_hand[index_in_on_hand + 1] | 
| +    else: | 
| +      next_candidate = None | 
| + | 
| +    return prev_candidate, fetch_start + index_in_fetched, next_candidate | 
| + | 
| +  def _AccumulateSampleIssues(self, issue_ids, sample_dict, needed_iids): | 
| +    """Select a scattering of issues from the list, leveraging RAM cache.""" | 
| +    chunk_size = max(MIN_SAMPLE_CHUNK_SIZE, min(MAX_SAMPLE_CHUNK_SIZE, | 
| +        int(len(issue_ids) / PREFERRED_NUM_CHUNKS))) | 
| +    for i in range(chunk_size, len(issue_ids), chunk_size): | 
| +      issue = self.services.issue.GetAnyOnHandIssue( | 
| +          issue_ids, start=i, end=min(i + chunk_size, len(issue_ids))) | 
| +      if issue: | 
| +        sample_dict[issue.issue_id] = issue | 
| +      else: | 
| +        needed_iids.append(issue_ids[i]) | 
| + | 
| +  def _LookupNeededUsers(self, issues): | 
| +    """Look up user info needed to sort issues, if any.""" | 
| +    with self.profiler.Phase('lookup of owner, reporter, and cc'): | 
| +      additional_user_views_by_id = ( | 
| +          tracker_helpers.MakeViewsForUsersInIssues( | 
| +              self.mr.cnxn, issues, self.services.user, | 
| +              omit_ids=self.users_by_id.keys())) | 
| +      self.users_by_id.update(additional_user_views_by_id) | 
| + | 
| +  def Paginate(self): | 
| +    """Fetch matching issues and paginate the search results. | 
| + | 
| +    These two actions are intertwined because we try to only | 
| +    retrieve the Issues on the current pagination page. | 
| +    """ | 
| +    if self.grid_mode: | 
| +      # We don't paginate the grid view.  But, pagination object shows counts. | 
| +      self.pagination = paginate.ArtifactPagination( | 
| +          self.mr, self.allowed_results, self.default_results_per_page, | 
| +          total_count=self.total_count, list_page_url=urls.ISSUE_LIST) | 
| +      # We limited the results, but still show the original total count. | 
| +      self.visible_results = self.allowed_results | 
| + | 
| +    else: | 
| +      # We already got the issues, just display a slice of the visible ones. | 
| +      limit_reached = False | 
| +      for shard_limit_reached in self.search_limit_reached.values(): | 
| +        limit_reached |= shard_limit_reached | 
| +      self.pagination = paginate.ArtifactPagination( | 
| +          self.mr, self.allowed_results, self.default_results_per_page, | 
| +          total_count=self.total_count, list_page_url=urls.ISSUE_LIST, | 
| +          limit_reached=limit_reached, skipped=self.num_skipped_at_start) | 
| +      self.visible_results = self.pagination.visible_results | 
| + | 
| +    # If we were not forced to look up visible users already, do it now. | 
| +    if self.grid_mode: | 
| +      self._LookupNeededUsers(self.allowed_results) | 
| +    else: | 
| +      self._LookupNeededUsers(self.visible_results) | 
| + | 
| +  def __repr__(self): | 
| +    """Return a string that shows the internal state of this pipeline.""" | 
| +    if self.allowed_iids: | 
| +      shown_allowed_iids = self.allowed_iids[:200] | 
| +    else: | 
| +      shown_allowed_iids = self.allowed_iids | 
| + | 
| +    if self.allowed_results: | 
| +      shown_allowed_results = self.allowed_results[:200] | 
| +    else: | 
| +      shown_allowed_results = self.allowed_results | 
| + | 
| +    parts = [ | 
| +        'allowed_iids: %r' % shown_allowed_iids, | 
| +        'allowed_results: %r' % shown_allowed_results, | 
| +        'len(visible_results): %r' % ( | 
| +            self.visible_results and len(self.visible_results))] | 
| +    return '%s(%s)' % (self.__class__.__name__, '\n'.join(parts)) | 
| + | 
| + | 
| +def _MakeBackendCallback(func, *args): | 
| +  return lambda: func(*args) | 
| + | 
| + | 
| +def _StartBackendSearch( | 
| +    mr, query_project_names, query_project_ids, harmonized_config, | 
| +    unfiltered_iids_dict, search_limit_reached_dict, | 
| +    nonviewable_iids, error_responses, services): | 
| +  """Request that our backends search and return a list of matching issue IDs. | 
| + | 
| +  Args: | 
| +    mr: commonly used info parsed from the request, including query and | 
| +        sort spec. | 
| +    query_project_names: set of project names to search. | 
| +    query_project_ids: list of project IDs to search. | 
| +    harmonized_config: combined ProjectIssueConfig for all projects being | 
| +        searched. | 
| +    unfiltered_iids_dict: dict {shard_id: [iid, ...]} of unfiltered search | 
| +        results to accumulate into.  They need to be later filtered by | 
| +        permissions and merged into filtered_iids_dict. | 
| +    search_limit_reached_dict: dict{shard_id: [bool, ...]} to determine if | 
| +        the search limit of any shard was reached. | 
| +    nonviewable_iids: dict {shard_id: set(iid)} of restricted issues in the | 
| +        projects being searched that the signed in user cannot view. | 
| +    services: connections to backends. | 
| + | 
| +  Returns: | 
| +    A list of rpc_tuples that can be passed to _FinishBackendSearch to wait | 
| +    on any remaining backend calls. | 
| + | 
| +  SIDE-EFFECTS: | 
| +    Any data found in memcache is immediately put into unfiltered_iids_dict. | 
| +    As the backends finish their work, _HandleBackendSearchResponse will update | 
| +    unfiltered_iids_dict for those shards. | 
| +  """ | 
| +  rpc_tuples = [] | 
| +  needed_shard_ids = set(range(settings.num_logical_shards)) | 
| + | 
| +  # 1. Get whatever we can from memcache.  Cache hits are only kept if they are | 
| +  # not already expired.  Each kept cache hit will have unfiltered IIDs, so we | 
| +  # need to get the at-risk IIDs to efficiently filter them based on perms. | 
| +  project_shard_timestamps = _GetProjectTimestamps( | 
| +      query_project_ids, needed_shard_ids) | 
| + | 
| +  if mr.use_cached_searches: | 
| +    cached_unfiltered_iids_dict, cached_search_limit_reached_dict = ( | 
| +        _GetCachedSearchResults( | 
| +            mr, query_project_ids, needed_shard_ids, harmonized_config, | 
| +            project_shard_timestamps, services)) | 
| +    unfiltered_iids_dict.update(cached_unfiltered_iids_dict) | 
| +    search_limit_reached_dict.update(cached_search_limit_reached_dict) | 
| +  for cache_hit_shard_id in unfiltered_iids_dict: | 
| +    needed_shard_ids.remove(cache_hit_shard_id) | 
| + | 
| +  _GetNonviewableIIDs( | 
| +    query_project_ids, mr.auth.user_id, set(range(settings.num_logical_shards)), | 
| +    rpc_tuples, nonviewable_iids, project_shard_timestamps, | 
| +    services.cache_manager.processed_invalidations_up_to, | 
| +    mr.use_cached_searches) | 
| + | 
| +  # 2. Hit backends for any shards that are still needed.  When these results | 
| +  # come back, they are also put into unfiltered_iids_dict.. | 
| +  for shard_id in needed_shard_ids: | 
| +    rpc = _StartBackendSearchCall( | 
| +        mr, query_project_names, shard_id, | 
| +        services.cache_manager.processed_invalidations_up_to) | 
| +    rpc_tuple = (time.time(), shard_id, rpc) | 
| +    rpc.callback = _MakeBackendCallback( | 
| +        _HandleBackendSearchResponse, mr, query_project_names, rpc_tuple, | 
| +        rpc_tuples, settings.backend_retries, unfiltered_iids_dict, | 
| +        search_limit_reached_dict, | 
| +        services.cache_manager.processed_invalidations_up_to, | 
| +        error_responses) | 
| +    rpc_tuples.append(rpc_tuple) | 
| + | 
| +  return rpc_tuples | 
| + | 
| + | 
| +def _FinishBackendSearch(rpc_tuples): | 
| +  """Wait for all backend calls to complete, including any retries.""" | 
| +  while rpc_tuples: | 
| +    active_rpcs = [rpc for (_time, _shard_id, rpc) in rpc_tuples] | 
| +    # Wait for any active RPC to complete.  It's callback function will | 
| +    # automatically be called. | 
| +    finished_rpc = apiproxy_stub_map.UserRPC.wait_any(active_rpcs) | 
| +    # Figure out which rpc_tuple finished and remove it from our list. | 
| +    for rpc_tuple in rpc_tuples: | 
| +      _time, _shard_id, rpc = rpc_tuple | 
| +      if rpc == finished_rpc: | 
| +        rpc_tuples.remove(rpc_tuple) | 
| +        break | 
| +    else: | 
| +      raise ValueError('We somehow finished an RPC that is not in rpc_tuples') | 
| + | 
| + | 
| +def _GetProjectTimestamps(query_project_ids, needed_shard_ids): | 
| +  """Get a dict of modified_ts values for all specified project-shards.""" | 
| +  project_shard_timestamps = {} | 
| +  if query_project_ids: | 
| +    keys = [] | 
| +    for pid in query_project_ids: | 
| +      for sid in needed_shard_ids: | 
| +        keys.append('%d;%d' % (pid, sid)) | 
| +  else: | 
| +    keys = [('all;%d' % sid) for sid in needed_shard_ids] | 
| + | 
| +  timestamps_for_project = memcache.get_multi(keys=keys) | 
| +  for key, timestamp in timestamps_for_project.iteritems(): | 
| +    pid_str, sid_str = key.split(';') | 
| +    if pid_str == 'all': | 
| +      project_shard_timestamps['all', int(sid_str)] = timestamp | 
| +    else: | 
| +      project_shard_timestamps[int(pid_str), int(sid_str)] = timestamp | 
| + | 
| +  return project_shard_timestamps | 
| + | 
| + | 
| +def _GetNonviewableIIDs( | 
| +    query_project_ids, logged_in_user_id, needed_shard_ids, rpc_tuples, | 
| +    nonviewable_iids, project_shard_timestamps, invalidation_timestep, | 
| +    use_cached_searches): | 
| +  """Build a set of at-risk IIDs, and accumulate RPCs to get uncached ones.""" | 
| +  if query_project_ids: | 
| +    keys = [] | 
| +    for pid in query_project_ids: | 
| +      for sid in needed_shard_ids: | 
| +        keys.append('%d;%d;%d' % (pid, logged_in_user_id, sid)) | 
| +  else: | 
| +    keys = [('all;%d;%d' % sid) | 
| +            for (logged_in_user_id, sid) in needed_shard_ids] | 
| + | 
| +  if use_cached_searches: | 
| +    cached_dict = memcache.get_multi(keys, key_prefix='nonviewable:') | 
| +  else: | 
| +    cached_dict = {} | 
| + | 
| +  for sid in needed_shard_ids: | 
| +    if query_project_ids: | 
| +      for pid in query_project_ids: | 
| +        _AccumulateNonviewableIIDs( | 
| +            pid, logged_in_user_id, sid, cached_dict, nonviewable_iids, | 
| +            project_shard_timestamps, rpc_tuples, invalidation_timestep) | 
| +    else: | 
| +      _AccumulateNonviewableIIDs( | 
| +          None, logged_in_user_id, sid, cached_dict, nonviewable_iids, | 
| +          project_shard_timestamps, rpc_tuples, invalidation_timestep) | 
| + | 
| + | 
| +def _AccumulateNonviewableIIDs( | 
| +    pid, logged_in_user_id, sid, cached_dict, nonviewable_iids, | 
| +    project_shard_timestamps, rpc_tuples, invalidation_timestep): | 
| +  """Use one of the retrieved cache entries or call a backend if needed.""" | 
| +  if pid is None: | 
| +    key = 'all;%d;%d' % (logged_in_user_id, sid) | 
| +  else: | 
| +    key = '%d;%d;%d' % (pid, logged_in_user_id, sid) | 
| + | 
| +  if key in cached_dict: | 
| +    issue_ids, cached_ts = cached_dict.get(key) | 
| +    modified_ts = project_shard_timestamps.get((pid, sid)) | 
| +    if modified_ts is None or modified_ts > cached_ts: | 
| +      logging.info('nonviewable too stale on (project %r, shard %r)', | 
| +                   pid, sid) | 
| +    else: | 
| +      logging.info('adding %d nonviewable issue_ids', len(issue_ids)) | 
| +      nonviewable_iids[sid] = set(issue_ids) | 
| + | 
| +  if sid not in nonviewable_iids: | 
| +    logging.info('nonviewable for %r not found', key) | 
| +    logging.info('starting backend call for nonviewable iids %r', key) | 
| +    rpc = _StartBackendNonviewableCall( | 
| +      pid, logged_in_user_id, sid, invalidation_timestep) | 
| +    rpc_tuple = (time.time(), sid, rpc) | 
| +    rpc.callback = _MakeBackendCallback( | 
| +        _HandleBackendNonviewableResponse, pid, logged_in_user_id, sid, | 
| +        rpc_tuple, rpc_tuples, settings.backend_retries, nonviewable_iids, | 
| +        invalidation_timestep) | 
| +    rpc_tuples.append(rpc_tuple) | 
| + | 
| + | 
| +def _GetCachedSearchResults( | 
| +    mr, query_project_ids, needed_shard_ids, harmonized_config, | 
| +    project_shard_timestamps, services): | 
| +  """Return a dict of cached search results that are not already stale. | 
| + | 
| +  If it were not for cross-project search, we would simply cache when we do a | 
| +  search and then invalidate when an issue is modified.  But, with | 
| +  cross-project search we don't know all the memcache entries that would | 
| +  need to be invalidated.  So, instead, we write the search result cache | 
| +  entries and then an initial modified_ts value for each project if it was | 
| +  not already there. And, when we update an issue we write a new | 
| +  modified_ts entry, which implicitly invalidate all search result | 
| +  cache entries that were written earlier because they are now stale.  When | 
| +  reading from the cache, we ignore any query project with modified_ts | 
| +  after its search result cache timestamp, because it is stale. | 
| + | 
| +  Args: | 
| +    mr: common information parsed from the request. | 
| +    query_project_ids: list of project ID numbers for all projects being | 
| +        searched. | 
| +    needed_shard_ids: set of shard IDs that need to be checked. | 
| +    harmonized_config: ProjectIsueConfig with combined information for all | 
| +        projects involved in this search. | 
| +    project_shard_timestamps: a dict {(project_id, shard_id): timestamp, ...} | 
| +        that tells when each shard was last invalidated. | 
| +    services: connections to backends. | 
| + | 
| +  Returns: | 
| +    Tuple consisting of: | 
| +      A dictionary {shard_id: [issue_id, ...], ...} of unfiltered search result | 
| +      issue IDs. Only shard_ids found in memcache will be in that dictionary. | 
| +      The result issue IDs must be permission checked before they can be | 
| +      considered to be part of the user's result set. | 
| +      A dictionary {shard_id: bool, ...}. The boolean is set to True if | 
| +      the search results limit of the shard is hit. | 
| +  """ | 
| +  projects_str = ','.join(str(pid) for pid in sorted(query_project_ids)) | 
| +  projects_str = projects_str or 'all' | 
| +  canned_query = savedqueries_helpers.SavedQueryIDToCond( | 
| +      mr.cnxn, services.features, mr.can) | 
| +  logging.info('canned query is %r', canned_query) | 
| +  canned_query = searchpipeline.ReplaceKeywordsWithUserID( | 
| +      mr.me_user_id, canned_query) | 
| +  user_query = searchpipeline.ReplaceKeywordsWithUserID( | 
| +      mr.me_user_id, mr.query) | 
| + | 
| +  sd = sorting.ComputeSortDirectives(mr, harmonized_config) | 
| +  memcache_prefix = ';'.join([projects_str, canned_query, user_query, | 
| +                              ' '.join(sd), '']) | 
| +  cached_dict = memcache.get_multi( | 
| +      [str(sid) for sid in needed_shard_ids], key_prefix=memcache_prefix) | 
| +  search_limit_memcache_prefix = ';'.join( | 
| +      [projects_str, canned_query, user_query, | 
| +       ' '.join(sd), 'search_limit_reached', '']) | 
| +  cached_search_limit_reached_dict = memcache.get_multi( | 
| +      [str(sid) for sid in needed_shard_ids], | 
| +      key_prefix=search_limit_memcache_prefix) | 
| + | 
| +  unfiltered_dict = {} | 
| +  search_limit_reached_dict = {} | 
| +  for shard_id in needed_shard_ids: | 
| +    if str(shard_id) not in cached_dict: | 
| +      logging.info('memcache miss on shard %r', shard_id) | 
| +      continue | 
| + | 
| +    cached_iids, cached_ts = cached_dict[str(shard_id)] | 
| +    if cached_search_limit_reached_dict.get(str(shard_id)): | 
| +      search_limit_reached, _ = cached_search_limit_reached_dict[str(shard_id)] | 
| +    else: | 
| +      search_limit_reached = False | 
| + | 
| +    stale = False | 
| +    if query_project_ids: | 
| +      for project_id in query_project_ids: | 
| +        modified_ts = project_shard_timestamps.get((project_id, shard_id)) | 
| +        if modified_ts is None or modified_ts > cached_ts: | 
| +          stale = True | 
| +          logging.info('memcache too stale on shard %r because of %r', | 
| +                       shard_id, project_id) | 
| +          break | 
| +    else: | 
| +      modified_ts = project_shard_timestamps.get(('all', shard_id)) | 
| +      if modified_ts is None or modified_ts > cached_ts: | 
| +        stale = True | 
| +        logging.info('memcache too stale on shard %r because of all', | 
| +                     shard_id) | 
| + | 
| +    if not stale: | 
| +      logging.info('memcache hit on %r', shard_id) | 
| +      unfiltered_dict[shard_id] = cached_iids | 
| +      search_limit_reached_dict[shard_id] = search_limit_reached | 
| + | 
| +  return unfiltered_dict, search_limit_reached_dict | 
| + | 
| + | 
| +def _MakeBackendRequestHeaders(failfast): | 
| +  headers = { | 
| +    # This is needed to allow frontends to talk to backends without going | 
| +    # through a login screen on googleplex.com. | 
| +    # http://wiki/Main/PrometheusInternal#Internal_Applications_and_APIs | 
| +    'X-URLFetch-Service-Id': 'GOOGLEPLEX', | 
| +    } | 
| +  if failfast: | 
| +    headers['X-AppEngine-FailFast'] = 'Yes' | 
| +  return headers | 
| + | 
| + | 
| +def _StartBackendSearchCall( | 
| +    mr, query_project_names, shard_id, invalidation_timestep, | 
| +    deadline=None, failfast=True): | 
| +  """Ask a backend to query one shard of the database.""" | 
| +  backend_host = modules.get_hostname(module='besearch') | 
| +  url = 'http://%s%s' % (backend_host, framework_helpers.FormatURL( | 
| +      mr, urls.BACKEND_SEARCH, | 
| +      skip_filtering=True,  # TODO(jrobbins): remove after next release. | 
| +      projects=','.join(query_project_names), | 
| +      start=0, num=mr.start + mr.num, | 
| +      logged_in_user_id=mr.auth.user_id or 0, | 
| +      me_user_id=mr.me_user_id, shard_id=shard_id, | 
| +      invalidation_timestep=invalidation_timestep)) | 
| +  logging.info('\n\nCalling backend: %s', url) | 
| +  rpc = urlfetch.create_rpc( | 
| +      deadline=deadline or settings.backend_deadline) | 
| +  headers = _MakeBackendRequestHeaders(failfast) | 
| +  # follow_redirects=False is needed to avoid a login screen on googleplex. | 
| +  urlfetch.make_fetch_call(rpc, url, follow_redirects=False, headers=headers) | 
| +  return rpc | 
| + | 
| + | 
| +def _StartBackendNonviewableCall( | 
| +    project_id, logged_in_user_id, shard_id, invalidation_timestep, | 
| +    deadline=None, failfast=True): | 
| +  """Ask a backend to query one shard of the database.""" | 
| +  backend_host = modules.get_hostname(module='besearch') | 
| +  url = 'http://%s%s' % (backend_host, framework_helpers.FormatURL( | 
| +      None, urls.BACKEND_NONVIEWABLE, | 
| +      project_id=project_id or '', | 
| +      logged_in_user_id=logged_in_user_id or '', | 
| +      shard_id=shard_id, | 
| +      invalidation_timestep=invalidation_timestep)) | 
| +  logging.info('Calling backend nonviewable: %s', url) | 
| +  rpc = urlfetch.create_rpc(deadline=deadline or settings.backend_deadline) | 
| +  headers = _MakeBackendRequestHeaders(failfast) | 
| +  # follow_redirects=False is needed to avoid a login screen on googleplex. | 
| +  urlfetch.make_fetch_call(rpc, url, follow_redirects=False, headers=headers) | 
| +  return rpc | 
| + | 
| + | 
| +def _HandleBackendSearchResponse( | 
| +    mr, query_project_names, rpc_tuple, rpc_tuples, remaining_retries, | 
| +    unfiltered_iids, search_limit_reached, invalidation_timestep, | 
| +    error_responses): | 
| +  """Process one backend response and retry if there was an error.""" | 
| +  start_time, shard_id, rpc = rpc_tuple | 
| +  duration_sec = time.time() - start_time | 
| + | 
| +  try: | 
| +    response = rpc.get_result() | 
| +    logging.info('call to backend took %d sec', duration_sec) | 
| +    # Note that response.content has "})]'\n" prepended to it. | 
| +    json_content = response.content[5:] | 
| +    logging.info('got json text: %r length %r', | 
| +                 json_content[:framework_constants.LOGGING_MAX_LENGTH], | 
| +                 len(json_content)) | 
| +    json_data = json.loads(json_content) | 
| +    unfiltered_iids[shard_id] = json_data['unfiltered_iids'] | 
| +    search_limit_reached[shard_id] = json_data['search_limit_reached'] | 
| + | 
| +  except Exception as e: | 
| +    if duration_sec > FAIL_FAST_LIMIT_SEC:  # Don't log fail-fast exceptions. | 
| +      logging.exception(e) | 
| +    if not remaining_retries: | 
| +      logging.error('backend search retries exceeded') | 
| +      error_responses.add(shard_id) | 
| +      return  # Used all retries, so give up. | 
| + | 
| +    if duration_sec >= settings.backend_deadline: | 
| +      logging.error('backend search on %r took too long', shard_id) | 
| +      error_responses.add(shard_id) | 
| +      return  # That backend shard is overloaded, so give up. | 
| + | 
| +    logging.error('backend call for shard %r failed, retrying', shard_id) | 
| +    retry_rpc = _StartBackendSearchCall( | 
| +        mr, query_project_names, shard_id, invalidation_timestep, | 
| +        failfast=remaining_retries > 2) | 
| +    retry_rpc_tuple = (time.time(), shard_id, retry_rpc) | 
| +    retry_rpc.callback = _MakeBackendCallback( | 
| +        _HandleBackendSearchResponse, mr, query_project_names, | 
| +        retry_rpc_tuple, rpc_tuples, remaining_retries - 1, unfiltered_iids, | 
| +        search_limit_reached, invalidation_timestep, error_responses) | 
| +    rpc_tuples.append(retry_rpc_tuple) | 
| + | 
| + | 
| +def _HandleBackendNonviewableResponse( | 
| +    project_id, logged_in_user_id, shard_id, rpc_tuple, rpc_tuples, | 
| +    remaining_retries, nonviewable_iids, invalidation_timestep): | 
| +  """Process one backend response and retry if there was an error.""" | 
| +  start_time, shard_id, rpc = rpc_tuple | 
| +  duration_sec = time.time() - start_time | 
| + | 
| +  try: | 
| +    response = rpc.get_result() | 
| +    logging.info('call to backend nonviewable took %d sec', duration_sec) | 
| +    # Note that response.content has "})]'\n" prepended to it. | 
| +    json_content = response.content[5:] | 
| +    logging.info('got json text: %r length %r', | 
| +                 json_content[:framework_constants.LOGGING_MAX_LENGTH], | 
| +                 len(json_content)) | 
| +    json_data = json.loads(json_content) | 
| +    nonviewable_iids[shard_id] = set(json_data['nonviewable']) | 
| + | 
| +  except Exception as e: | 
| +    if duration_sec > FAIL_FAST_LIMIT_SEC:  # Don't log fail-fast exceptions. | 
| +      logging.exception(e) | 
| + | 
| +    if not remaining_retries: | 
| +      logging.warn('Used all retries, so give up on shard %r', shard_id) | 
| +      return | 
| + | 
| +    if duration_sec >= settings.backend_deadline: | 
| +      logging.error('nonviewable call on %r took too long', shard_id) | 
| +      return  # That backend shard is overloaded, so give up. | 
| + | 
| +    logging.error( | 
| +      'backend nonviewable call for shard %r;%r;%r failed, retrying', | 
| +      project_id, logged_in_user_id, shard_id) | 
| +    retry_rpc = _StartBackendNonviewableCall( | 
| +        project_id, logged_in_user_id, shard_id, invalidation_timestep, | 
| +        failfast=remaining_retries > 2) | 
| +    retry_rpc_tuple = (time.time(), shard_id, retry_rpc) | 
| +    retry_rpc.callback = _MakeBackendCallback( | 
| +        _HandleBackendNonviewableResponse, project_id, logged_in_user_id, | 
| +        shard_id, retry_rpc_tuple, rpc_tuples, remaining_retries - 1, | 
| +        nonviewable_iids, invalidation_timestep) | 
| +    rpc_tuples.append(retry_rpc_tuple) | 
| + | 
| + | 
| +def _TotalLength(sharded_iids): | 
| +  """Return the total length of all issue_iids lists.""" | 
| +  return sum(len(issue_iids) for issue_iids in sharded_iids.itervalues()) | 
| + | 
| + | 
| +def _ReverseShards(sharded_iids): | 
| +  """Reverse each issue_iids list in place.""" | 
| +  for shard_id in sharded_iids: | 
| +    sharded_iids[shard_id].reverse() | 
| + | 
| + | 
| +def _TrimEndShardedIIDs(sharded_iids, sample_iids, num_needed): | 
| +  """Trim the IIDs to keep at least num_needed items. | 
| + | 
| +  Args: | 
| +    sharded_iids: dict {shard_id: issue_id_list} for search results.  This is | 
| +        modified in place to remove some trailing issue IDs. | 
| +    sample_iids: list of IIDs from a sorted list of sample issues. | 
| +    num_needed: int minimum total number of items to keep.  Some IIDs that are | 
| +        known to belong in positions > num_needed will be trimmed off. | 
| + | 
| +  Returns: | 
| +    The total number of IIDs removed from the IID lists. | 
| +  """ | 
| +  # 1. Get (sample_iid, position_in_shard) for each sample. | 
| +  sample_positions = _CalcSamplePositions(sharded_iids, sample_iids) | 
| + | 
| +  # 2. Walk through the samples, computing a combined lower bound at each | 
| +  # step until we know that we have passed at least num_needed IIDs. | 
| +  lower_bound_per_shard = {} | 
| +  excess_samples = [] | 
| +  for i in range(len(sample_positions)): | 
| +    sample_iid, pos = sample_positions[i] | 
| +    shard_id = sample_iid % settings.num_logical_shards | 
| +    lower_bound_per_shard[shard_id] = pos | 
| +    overall_lower_bound = sum(lower_bound_per_shard.itervalues()) | 
| +    if overall_lower_bound >= num_needed: | 
| +      excess_samples = sample_positions[i + 1:] | 
| +      break | 
| +  else: | 
| +    return 0  # We went through all samples and never reached num_needed. | 
| + | 
| +  # 3. Truncate each shard at the first excess sample in that shard. | 
| +  already_trimmed = set() | 
| +  num_trimmed = 0 | 
| +  for sample_iid, pos in excess_samples: | 
| +    shard_id = sample_iid % settings.num_logical_shards | 
| +    if shard_id not in already_trimmed: | 
| +      num_trimmed += len(sharded_iids[shard_id]) - pos | 
| +      sharded_iids[shard_id] = sharded_iids[shard_id][:pos] | 
| +      already_trimmed.add(shard_id) | 
| + | 
| +  return num_trimmed | 
| + | 
| + | 
| +# TODO(jrobbins): Convert this to a python generator. | 
| +def _CalcSamplePositions(sharded_iids, sample_iids): | 
| +  """Return [(sample_iid, position_in_shard), ...] for each sample.""" | 
| +  # We keep track of how far index() has scanned in each shard to avoid | 
| +  # starting over at position 0 when looking for the next sample in | 
| +  # the same shard. | 
| +  scan_positions = collections.defaultdict(lambda: 0) | 
| +  sample_positions = [] | 
| +  for sample_iid in sample_iids: | 
| +    shard_id = sample_iid % settings.num_logical_shards | 
| +    try: | 
| +      pos = sharded_iids.get(shard_id, []).index( | 
| +          sample_iid, scan_positions[shard_id]) | 
| +      scan_positions[shard_id] = pos | 
| +      sample_positions.append((sample_iid, pos)) | 
| +    except ValueError: | 
| +      pass | 
| + | 
| +  return sample_positions | 
| + | 
| + | 
| +def _SortIssues(mr, issues, config, users_by_id): | 
| +  """Sort the found issues based on the request and config values. | 
| + | 
| +  Args: | 
| +    mr: common information parsed from the HTTP request. | 
| +    issues: A list of issues to be sorted. | 
| +    config: A ProjectIssueConfig that could impact sort order. | 
| +    users_by_id: dictionary {user_id: user_view,...} for all users who | 
| +      participate in any issue in the entire list. | 
| + | 
| +  Returns: | 
| +    A sorted list of issues, based on parameters from mr and config. | 
| +  """ | 
| +  issues = sorting.SortArtifacts( | 
| +      mr, issues, config, tracker_helpers.SORTABLE_FIELDS, | 
| +      username_cols=tracker_constants.USERNAME_COLS, users_by_id=users_by_id) | 
| +  return issues | 
|  |