OLD | NEW |
(Empty) | |
| 1 # Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 # Use of this source code is govered by a BSD-style |
| 3 # license that can be found in the LICENSE file or at |
| 4 # https://developers.google.com/open-source/licenses/bsd |
| 5 |
| 6 """The FrontendSearchPipeline class manages issue search and sorting. |
| 7 |
| 8 The frontend pipeline checks memcache for cached results in each shard. It |
| 9 then calls backend jobs to do any shards that had a cache miss. On cache hit, |
| 10 the cached results must be filtered by permissions, so the at-risk cache and |
| 11 backends are consulted. Next, the sharded results are combined into an overall |
| 12 list of IIDs. Then, that list is paginated and the issues on the current |
| 13 pagination page can be shown. Alternatively, this class can determine just the |
| 14 position the currently shown issue would occupy in the overall sorted list. |
| 15 """ |
| 16 |
| 17 import json |
| 18 |
| 19 import collections |
| 20 import logging |
| 21 import math |
| 22 import random |
| 23 import time |
| 24 |
| 25 from google.appengine.api import apiproxy_stub_map |
| 26 from google.appengine.api import memcache |
| 27 from google.appengine.api import modules |
| 28 from google.appengine.api import urlfetch |
| 29 |
| 30 import settings |
| 31 from features import savedqueries_helpers |
| 32 from framework import framework_constants |
| 33 from framework import framework_helpers |
| 34 from framework import paginate |
| 35 from framework import sorting |
| 36 from framework import urls |
| 37 from search import query2ast |
| 38 from search import searchpipeline |
| 39 from services import fulltext_helpers |
| 40 from tracker import tracker_bizobj |
| 41 from tracker import tracker_constants |
| 42 from tracker import tracker_helpers |
| 43 |
| 44 |
| 45 # Fail-fast responses usually finish in less than 50ms. If we see a failure |
| 46 # in under that amount of time, we don't bother logging it. |
| 47 FAIL_FAST_LIMIT_SEC = 0.050 |
| 48 |
| 49 # The choices help balance the cost of choosing samples vs. the cost of |
| 50 # selecting issues that are in a range bounded by neighboring samples. |
| 51 # Preferred chunk size parameters were determined by experimentation. |
| 52 MIN_SAMPLE_CHUNK_SIZE = int( |
| 53 math.sqrt(tracker_constants.DEFAULT_RESULTS_PER_PAGE)) |
| 54 MAX_SAMPLE_CHUNK_SIZE = int(math.sqrt(settings.search_limit_per_shard)) |
| 55 PREFERRED_NUM_CHUNKS = 50 |
| 56 |
| 57 |
| 58 class FrontendSearchPipeline(object): |
| 59 """Manage the process of issue search, including backends and caching. |
| 60 |
| 61 Even though the code is divided into several methods, the public |
| 62 methods should be called in sequence, so the execution of the code |
| 63 is pretty much in the order of the source code lines here. |
| 64 """ |
| 65 |
| 66 def __init__(self, mr, services, prof, default_results_per_page): |
| 67 self.mr = mr |
| 68 self.services = services |
| 69 self.profiler = prof |
| 70 self.default_results_per_page = default_results_per_page |
| 71 self.grid_mode = (mr.mode == 'grid') |
| 72 self.grid_limited = False |
| 73 self.pagination = None |
| 74 self.num_skipped_at_start = 0 |
| 75 self.total_count = 0 |
| 76 |
| 77 self.query_project_names = set() |
| 78 if mr.query_project_names: |
| 79 self.query_project_names.update(mr.query_project_names) |
| 80 |
| 81 projects = services.project.GetProjectsByName( |
| 82 mr.cnxn, self.query_project_names).values() |
| 83 self.query_project_ids = [p.project_id for p in projects] |
| 84 if mr.project_name: |
| 85 self.query_project_ids.append(mr.project_id) |
| 86 self.query_project_names.add(mr.project_name) |
| 87 |
| 88 config_dict = self.services.config.GetProjectConfigs( |
| 89 mr.cnxn, self.query_project_ids) |
| 90 self.harmonized_config = tracker_bizobj.HarmonizeConfigs( |
| 91 config_dict.values()) |
| 92 |
| 93 # The following fields are filled in as the pipeline progresses. |
| 94 # The value None means that we still need to compute that value. |
| 95 self.users_by_id = {} |
| 96 self.nonviewable_iids = {} # {shard_id: set(iid)} |
| 97 self.unfiltered_iids = {} # {shard_id: [iid, ...]} needing perm checks. |
| 98 self.filtered_iids = {} # {shard_id: [iid, ...]} already perm checked. |
| 99 self.search_limit_reached = {} # {shard_id: [bool, ...]}. |
| 100 self.counts = {} |
| 101 self.allowed_iids = [] # Matching iids that user is permitted to view. |
| 102 self.allowed_results = None # results that the user is permitted to view. |
| 103 self.visible_results = None # allowed_results on current pagination page. |
| 104 self.error_responses = set() |
| 105 |
| 106 # Projects that contain the result issues. This starts off as a dict of |
| 107 # all the query projects, but it can grow based on the found issues in the |
| 108 # case where the user is searching across the entire site. |
| 109 self.issue_projects = {p.project_id: p for p in projects} |
| 110 |
| 111 error_msg = query2ast.CheckSyntax( |
| 112 self.mr.query, self.harmonized_config, warnings=self.mr.warnings) |
| 113 if error_msg: |
| 114 self.mr.errors.query = error_msg |
| 115 |
| 116 def SearchForIIDs(self): |
| 117 """Use backends to search each shard and store their results.""" |
| 118 with self.profiler.Phase('Checking cache and calling Backends'): |
| 119 rpc_tuples = _StartBackendSearch( |
| 120 self.mr, self.query_project_names, self.query_project_ids, |
| 121 self.harmonized_config, self.unfiltered_iids, |
| 122 self.search_limit_reached, self.nonviewable_iids, |
| 123 self.error_responses, self.services) |
| 124 |
| 125 with self.profiler.Phase('Waiting for Backends'): |
| 126 try: |
| 127 _FinishBackendSearch(rpc_tuples) |
| 128 except Exception as e: |
| 129 logging.exception(e) |
| 130 raise |
| 131 |
| 132 if self.error_responses: |
| 133 logging.error('%r error responses. Incomplete search results.', |
| 134 self.error_responses) |
| 135 |
| 136 with self.profiler.Phase('Filtering cached results'): |
| 137 for shard_id in self.unfiltered_iids: |
| 138 if shard_id not in self.nonviewable_iids: |
| 139 logging.error( |
| 140 'Not displaying shard %r because of no nonviewable_iids', shard_id) |
| 141 self.error_responses.add(shard_id) |
| 142 filtered_shard_iids = [] |
| 143 else: |
| 144 unfiltered_shard_iids = self.unfiltered_iids[shard_id] |
| 145 nonviewable_shard_iids = self.nonviewable_iids[shard_id] |
| 146 # TODO(jrobbins): avoid creating large temporary lists. |
| 147 filtered_shard_iids = [iid for iid in unfiltered_shard_iids |
| 148 if iid not in nonviewable_shard_iids] |
| 149 if self.grid_mode: |
| 150 self.filtered_iids[shard_id] = filtered_shard_iids |
| 151 else: |
| 152 self.filtered_iids[shard_id] = filtered_shard_iids[ |
| 153 :self.mr.start + self.mr.num] |
| 154 self.counts[shard_id] = len(filtered_shard_iids) |
| 155 |
| 156 with self.profiler.Phase('Counting all filtered results'): |
| 157 self.total_count = sum(self.counts.itervalues()) |
| 158 |
| 159 def MergeAndSortIssues(self): |
| 160 """Merge and sort results from all shards into one combined list.""" |
| 161 with self.profiler.Phase('selecting issues to merge and sort'): |
| 162 if not self.grid_mode: |
| 163 self._NarrowFilteredIIDs() |
| 164 self.allowed_iids = [] |
| 165 for filtered_shard_iids in self.filtered_iids.itervalues(): |
| 166 self.allowed_iids.extend(filtered_shard_iids) |
| 167 |
| 168 # The grid view is not paginated, so limit the results shown to avoid |
| 169 # generating a HTML page that would be too large. |
| 170 limit = settings.max_issues_in_grid |
| 171 if self.grid_mode and len(self.allowed_iids) > limit: |
| 172 self.grid_limited = True |
| 173 self.allowed_iids = self.allowed_iids[:limit] |
| 174 |
| 175 with self.profiler.Phase('getting allowed results'): |
| 176 self.allowed_results = self.services.issue.GetIssues( |
| 177 self.mr.cnxn, self.allowed_iids) |
| 178 |
| 179 # Note: At this point, we have results that are only sorted within |
| 180 # each backend's shard. We still need to sort the merged result. |
| 181 self._LookupNeededUsers(self.allowed_results) |
| 182 with self.profiler.Phase('merging and sorting issues'): |
| 183 self.allowed_results = _SortIssues( |
| 184 self.mr, self.allowed_results, self.harmonized_config, |
| 185 self.users_by_id) |
| 186 |
| 187 def _NarrowFilteredIIDs(self): |
| 188 """Combine filtered shards into a range of IIDs for issues to sort. |
| 189 |
| 190 The niave way is to concatenate shard_iids[:start + num] for all |
| 191 shards then select [start:start + num]. We do better by sampling |
| 192 issues and then determining which of those samples are known to |
| 193 come before start or after start+num. We then trim off all those IIDs |
| 194 and sort a smaller range of IIDs that might actuall be displayed. |
| 195 See the design doc at go/monorail-sorting. |
| 196 |
| 197 This method modifies self.fitered_iids and self.num_skipped_at_start. |
| 198 """ |
| 199 # Sample issues and skip those that are known to come before start. |
| 200 # See the "Sorting in Monorail" design doc. |
| 201 |
| 202 # If the result set is small, don't bother optimizing it. |
| 203 orig_length = _TotalLength(self.filtered_iids) |
| 204 if orig_length < self.mr.num * 4: |
| 205 return |
| 206 |
| 207 # 1. Get sample issues in each shard and sort them all together. |
| 208 last = self.mr.start + self.mr.num |
| 209 on_hand_samples = {} |
| 210 needed_iids = [] |
| 211 for shard_id in self.filtered_iids: |
| 212 self._AccumulateSampleIssues( |
| 213 self.filtered_iids[shard_id], on_hand_samples, needed_iids) |
| 214 retrieved_samples = self.services.issue.GetIssuesDict( |
| 215 self.mr.cnxn, needed_iids) |
| 216 sample_issues = on_hand_samples.values() + retrieved_samples.values() |
| 217 self._LookupNeededUsers(sample_issues) |
| 218 sample_issues = _SortIssues( |
| 219 self.mr, sample_issues, self.harmonized_config, self.users_by_id) |
| 220 sample_iids = [issue.issue_id for issue in sample_issues] |
| 221 |
| 222 # 2. Trim off some IIDs that are sure to be positioned after last. |
| 223 num_trimmed_end = _TrimEndShardedIIDs(self.filtered_iids, sample_iids, last) |
| 224 logging.info('Trimmed %r issues from the end of shards', num_trimmed_end) |
| 225 |
| 226 # 3. Trim off some IIDs that are sure to be posiitoned before start. |
| 227 keep = _TotalLength(self.filtered_iids) - self.mr.start |
| 228 # Reverse the sharded lists. |
| 229 _ReverseShards(self.filtered_iids) |
| 230 sample_iids.reverse() |
| 231 self.num_skipped_at_start = _TrimEndShardedIIDs( |
| 232 self.filtered_iids, sample_iids, keep) |
| 233 logging.info('Trimmed %r issues from the start of shards', |
| 234 self.num_skipped_at_start) |
| 235 # Reverse sharded lists again to get back into forward order. |
| 236 _ReverseShards(self.filtered_iids) |
| 237 |
| 238 def DetermineIssuePosition(self, issue): |
| 239 """Calculate info needed to show the issue flipper. |
| 240 |
| 241 Args: |
| 242 issue: The issue currently being viewed. |
| 243 |
| 244 Returns: |
| 245 A 3-tuple (prev_iid, index, next_iid) were prev_iid is the |
| 246 IID of the previous issue in the total ordering (or None), |
| 247 index is the index that the current issue has in the total |
| 248 ordering, and next_iid is the next issue (or None). If the current |
| 249 issue is not in the list of results at all, returns None, None, None. |
| 250 """ |
| 251 # 1. If the current issue is not in the results at all, then exit. |
| 252 if not any(issue.issue_id in filtered_shard_iids |
| 253 for filtered_shard_iids in self.filtered_iids.itervalues()): |
| 254 return None, None, None |
| 255 |
| 256 # 2. Choose and retrieve sample issues in each shard. |
| 257 samples_by_shard = {} # {shard_id: {iid: issue}} |
| 258 needed_iids = [] |
| 259 for shard_id in self.filtered_iids: |
| 260 samples_by_shard[shard_id] = {} |
| 261 self._AccumulateSampleIssues( |
| 262 self.filtered_iids[shard_id], samples_by_shard[shard_id], needed_iids) |
| 263 retrieved_samples = self.services.issue.GetIssuesDict( |
| 264 self.mr.cnxn, needed_iids) |
| 265 for retrieved_iid, retrieved_issue in retrieved_samples.iteritems(): |
| 266 shard_id = retrieved_iid % settings.num_logical_shards |
| 267 samples_by_shard[shard_id][retrieved_iid] = retrieved_issue |
| 268 |
| 269 # 3. Build up partial results for each shard. |
| 270 preceeding_counts = {} # dict {shard_id: num_issues_preceeding_current} |
| 271 prev_candidates, next_candidates = [], [] |
| 272 for shard_id in self.filtered_iids: |
| 273 prev_candidate, index_in_shard, next_candidate = ( |
| 274 self._DetermineIssuePositionInShard( |
| 275 shard_id, issue, samples_by_shard[shard_id])) |
| 276 preceeding_counts[shard_id] = index_in_shard |
| 277 if prev_candidate: |
| 278 prev_candidates.append(prev_candidate) |
| 279 if next_candidate: |
| 280 next_candidates.append(next_candidate) |
| 281 |
| 282 # 4. Combine the results. |
| 283 index = sum(preceeding_counts.itervalues()) |
| 284 prev_candidates = _SortIssues( |
| 285 self.mr, prev_candidates, self.harmonized_config, self.users_by_id) |
| 286 prev_iid = prev_candidates[-1].issue_id if prev_candidates else None |
| 287 next_candidates = _SortIssues( |
| 288 self.mr, next_candidates, self.harmonized_config, self.users_by_id) |
| 289 next_iid = next_candidates[0].issue_id if next_candidates else None |
| 290 |
| 291 return prev_iid, index, next_iid |
| 292 |
| 293 def _DetermineIssuePositionInShard(self, shard_id, issue, sample_dict): |
| 294 """Determine where the given issue would fit into results from a shard.""" |
| 295 # See the design doc for details. Basically, it first surveys the results |
| 296 # to bound a range where the given issue would belong, then it fetches the |
| 297 # issues in that range and sorts them. |
| 298 |
| 299 filtered_shard_iids = self.filtered_iids[shard_id] |
| 300 |
| 301 # 1. Select a sample of issues, leveraging ones we have in RAM already. |
| 302 issues_on_hand = sample_dict.values() |
| 303 if issue.issue_id not in sample_dict: |
| 304 issues_on_hand.append(issue) |
| 305 |
| 306 self._LookupNeededUsers(issues_on_hand) |
| 307 sorted_on_hand = _SortIssues( |
| 308 self.mr, issues_on_hand, self.harmonized_config, self.users_by_id) |
| 309 sorted_on_hand_iids = [soh.issue_id for soh in sorted_on_hand] |
| 310 index_in_on_hand = sorted_on_hand_iids.index(issue.issue_id) |
| 311 |
| 312 # 2. Bound the gap around where issue belongs. |
| 313 if index_in_on_hand == 0: |
| 314 fetch_start = 0 |
| 315 else: |
| 316 prev_on_hand_iid = sorted_on_hand_iids[index_in_on_hand - 1] |
| 317 fetch_start = filtered_shard_iids.index(prev_on_hand_iid) + 1 |
| 318 |
| 319 if index_in_on_hand == len(sorted_on_hand) - 1: |
| 320 fetch_end = len(filtered_shard_iids) |
| 321 else: |
| 322 next_on_hand_iid = sorted_on_hand_iids[index_in_on_hand + 1] |
| 323 fetch_end = filtered_shard_iids.index(next_on_hand_iid) |
| 324 |
| 325 # 3. Retrieve all the issues in that gap to get an exact answer. |
| 326 fetched_issues = self.services.issue.GetIssues( |
| 327 self.mr.cnxn, filtered_shard_iids[fetch_start:fetch_end]) |
| 328 if issue.issue_id not in filtered_shard_iids[fetch_start:fetch_end]: |
| 329 fetched_issues.append(issue) |
| 330 self._LookupNeededUsers(fetched_issues) |
| 331 sorted_fetched = _SortIssues( |
| 332 self.mr, fetched_issues, self.harmonized_config, self.users_by_id) |
| 333 sorted_fetched_iids = [sf.issue_id for sf in sorted_fetched] |
| 334 index_in_fetched = sorted_fetched_iids.index(issue.issue_id) |
| 335 |
| 336 # 4. Find the issues that come immediately before and after the place where |
| 337 # the given issue would belong in this shard. |
| 338 if index_in_fetched > 0: |
| 339 prev_candidate = sorted_fetched[index_in_fetched - 1] |
| 340 elif index_in_on_hand > 0: |
| 341 prev_candidate = sorted_on_hand[index_in_on_hand - 1] |
| 342 else: |
| 343 prev_candidate = None |
| 344 |
| 345 if index_in_fetched < len(sorted_fetched) - 1: |
| 346 next_candidate = sorted_fetched[index_in_fetched + 1] |
| 347 elif index_in_on_hand < len(sorted_on_hand) - 1: |
| 348 next_candidate = sorted_on_hand[index_in_on_hand + 1] |
| 349 else: |
| 350 next_candidate = None |
| 351 |
| 352 return prev_candidate, fetch_start + index_in_fetched, next_candidate |
| 353 |
| 354 def _AccumulateSampleIssues(self, issue_ids, sample_dict, needed_iids): |
| 355 """Select a scattering of issues from the list, leveraging RAM cache.""" |
| 356 chunk_size = max(MIN_SAMPLE_CHUNK_SIZE, min(MAX_SAMPLE_CHUNK_SIZE, |
| 357 int(len(issue_ids) / PREFERRED_NUM_CHUNKS))) |
| 358 for i in range(chunk_size, len(issue_ids), chunk_size): |
| 359 issue = self.services.issue.GetAnyOnHandIssue( |
| 360 issue_ids, start=i, end=min(i + chunk_size, len(issue_ids))) |
| 361 if issue: |
| 362 sample_dict[issue.issue_id] = issue |
| 363 else: |
| 364 needed_iids.append(issue_ids[i]) |
| 365 |
| 366 def _LookupNeededUsers(self, issues): |
| 367 """Look up user info needed to sort issues, if any.""" |
| 368 with self.profiler.Phase('lookup of owner, reporter, and cc'): |
| 369 additional_user_views_by_id = ( |
| 370 tracker_helpers.MakeViewsForUsersInIssues( |
| 371 self.mr.cnxn, issues, self.services.user, |
| 372 omit_ids=self.users_by_id.keys())) |
| 373 self.users_by_id.update(additional_user_views_by_id) |
| 374 |
| 375 def Paginate(self): |
| 376 """Fetch matching issues and paginate the search results. |
| 377 |
| 378 These two actions are intertwined because we try to only |
| 379 retrieve the Issues on the current pagination page. |
| 380 """ |
| 381 if self.grid_mode: |
| 382 # We don't paginate the grid view. But, pagination object shows counts. |
| 383 self.pagination = paginate.ArtifactPagination( |
| 384 self.mr, self.allowed_results, self.default_results_per_page, |
| 385 total_count=self.total_count, list_page_url=urls.ISSUE_LIST) |
| 386 # We limited the results, but still show the original total count. |
| 387 self.visible_results = self.allowed_results |
| 388 |
| 389 else: |
| 390 # We already got the issues, just display a slice of the visible ones. |
| 391 limit_reached = False |
| 392 for shard_limit_reached in self.search_limit_reached.values(): |
| 393 limit_reached |= shard_limit_reached |
| 394 self.pagination = paginate.ArtifactPagination( |
| 395 self.mr, self.allowed_results, self.default_results_per_page, |
| 396 total_count=self.total_count, list_page_url=urls.ISSUE_LIST, |
| 397 limit_reached=limit_reached, skipped=self.num_skipped_at_start) |
| 398 self.visible_results = self.pagination.visible_results |
| 399 |
| 400 # If we were not forced to look up visible users already, do it now. |
| 401 if self.grid_mode: |
| 402 self._LookupNeededUsers(self.allowed_results) |
| 403 else: |
| 404 self._LookupNeededUsers(self.visible_results) |
| 405 |
| 406 def __repr__(self): |
| 407 """Return a string that shows the internal state of this pipeline.""" |
| 408 if self.allowed_iids: |
| 409 shown_allowed_iids = self.allowed_iids[:200] |
| 410 else: |
| 411 shown_allowed_iids = self.allowed_iids |
| 412 |
| 413 if self.allowed_results: |
| 414 shown_allowed_results = self.allowed_results[:200] |
| 415 else: |
| 416 shown_allowed_results = self.allowed_results |
| 417 |
| 418 parts = [ |
| 419 'allowed_iids: %r' % shown_allowed_iids, |
| 420 'allowed_results: %r' % shown_allowed_results, |
| 421 'len(visible_results): %r' % ( |
| 422 self.visible_results and len(self.visible_results))] |
| 423 return '%s(%s)' % (self.__class__.__name__, '\n'.join(parts)) |
| 424 |
| 425 |
| 426 def _MakeBackendCallback(func, *args): |
| 427 return lambda: func(*args) |
| 428 |
| 429 |
| 430 def _StartBackendSearch( |
| 431 mr, query_project_names, query_project_ids, harmonized_config, |
| 432 unfiltered_iids_dict, search_limit_reached_dict, |
| 433 nonviewable_iids, error_responses, services): |
| 434 """Request that our backends search and return a list of matching issue IDs. |
| 435 |
| 436 Args: |
| 437 mr: commonly used info parsed from the request, including query and |
| 438 sort spec. |
| 439 query_project_names: set of project names to search. |
| 440 query_project_ids: list of project IDs to search. |
| 441 harmonized_config: combined ProjectIssueConfig for all projects being |
| 442 searched. |
| 443 unfiltered_iids_dict: dict {shard_id: [iid, ...]} of unfiltered search |
| 444 results to accumulate into. They need to be later filtered by |
| 445 permissions and merged into filtered_iids_dict. |
| 446 search_limit_reached_dict: dict{shard_id: [bool, ...]} to determine if |
| 447 the search limit of any shard was reached. |
| 448 nonviewable_iids: dict {shard_id: set(iid)} of restricted issues in the |
| 449 projects being searched that the signed in user cannot view. |
| 450 services: connections to backends. |
| 451 |
| 452 Returns: |
| 453 A list of rpc_tuples that can be passed to _FinishBackendSearch to wait |
| 454 on any remaining backend calls. |
| 455 |
| 456 SIDE-EFFECTS: |
| 457 Any data found in memcache is immediately put into unfiltered_iids_dict. |
| 458 As the backends finish their work, _HandleBackendSearchResponse will update |
| 459 unfiltered_iids_dict for those shards. |
| 460 """ |
| 461 rpc_tuples = [] |
| 462 needed_shard_ids = set(range(settings.num_logical_shards)) |
| 463 |
| 464 # 1. Get whatever we can from memcache. Cache hits are only kept if they are |
| 465 # not already expired. Each kept cache hit will have unfiltered IIDs, so we |
| 466 # need to get the at-risk IIDs to efficiently filter them based on perms. |
| 467 project_shard_timestamps = _GetProjectTimestamps( |
| 468 query_project_ids, needed_shard_ids) |
| 469 |
| 470 if mr.use_cached_searches: |
| 471 cached_unfiltered_iids_dict, cached_search_limit_reached_dict = ( |
| 472 _GetCachedSearchResults( |
| 473 mr, query_project_ids, needed_shard_ids, harmonized_config, |
| 474 project_shard_timestamps, services)) |
| 475 unfiltered_iids_dict.update(cached_unfiltered_iids_dict) |
| 476 search_limit_reached_dict.update(cached_search_limit_reached_dict) |
| 477 for cache_hit_shard_id in unfiltered_iids_dict: |
| 478 needed_shard_ids.remove(cache_hit_shard_id) |
| 479 |
| 480 _GetNonviewableIIDs( |
| 481 query_project_ids, mr.auth.user_id, set(range(settings.num_logical_shards)), |
| 482 rpc_tuples, nonviewable_iids, project_shard_timestamps, |
| 483 services.cache_manager.processed_invalidations_up_to, |
| 484 mr.use_cached_searches) |
| 485 |
| 486 # 2. Hit backends for any shards that are still needed. When these results |
| 487 # come back, they are also put into unfiltered_iids_dict.. |
| 488 for shard_id in needed_shard_ids: |
| 489 rpc = _StartBackendSearchCall( |
| 490 mr, query_project_names, shard_id, |
| 491 services.cache_manager.processed_invalidations_up_to) |
| 492 rpc_tuple = (time.time(), shard_id, rpc) |
| 493 rpc.callback = _MakeBackendCallback( |
| 494 _HandleBackendSearchResponse, mr, query_project_names, rpc_tuple, |
| 495 rpc_tuples, settings.backend_retries, unfiltered_iids_dict, |
| 496 search_limit_reached_dict, |
| 497 services.cache_manager.processed_invalidations_up_to, |
| 498 error_responses) |
| 499 rpc_tuples.append(rpc_tuple) |
| 500 |
| 501 return rpc_tuples |
| 502 |
| 503 |
| 504 def _FinishBackendSearch(rpc_tuples): |
| 505 """Wait for all backend calls to complete, including any retries.""" |
| 506 while rpc_tuples: |
| 507 active_rpcs = [rpc for (_time, _shard_id, rpc) in rpc_tuples] |
| 508 # Wait for any active RPC to complete. It's callback function will |
| 509 # automatically be called. |
| 510 finished_rpc = apiproxy_stub_map.UserRPC.wait_any(active_rpcs) |
| 511 # Figure out which rpc_tuple finished and remove it from our list. |
| 512 for rpc_tuple in rpc_tuples: |
| 513 _time, _shard_id, rpc = rpc_tuple |
| 514 if rpc == finished_rpc: |
| 515 rpc_tuples.remove(rpc_tuple) |
| 516 break |
| 517 else: |
| 518 raise ValueError('We somehow finished an RPC that is not in rpc_tuples') |
| 519 |
| 520 |
| 521 def _GetProjectTimestamps(query_project_ids, needed_shard_ids): |
| 522 """Get a dict of modified_ts values for all specified project-shards.""" |
| 523 project_shard_timestamps = {} |
| 524 if query_project_ids: |
| 525 keys = [] |
| 526 for pid in query_project_ids: |
| 527 for sid in needed_shard_ids: |
| 528 keys.append('%d;%d' % (pid, sid)) |
| 529 else: |
| 530 keys = [('all;%d' % sid) for sid in needed_shard_ids] |
| 531 |
| 532 timestamps_for_project = memcache.get_multi(keys=keys) |
| 533 for key, timestamp in timestamps_for_project.iteritems(): |
| 534 pid_str, sid_str = key.split(';') |
| 535 if pid_str == 'all': |
| 536 project_shard_timestamps['all', int(sid_str)] = timestamp |
| 537 else: |
| 538 project_shard_timestamps[int(pid_str), int(sid_str)] = timestamp |
| 539 |
| 540 return project_shard_timestamps |
| 541 |
| 542 |
| 543 def _GetNonviewableIIDs( |
| 544 query_project_ids, logged_in_user_id, needed_shard_ids, rpc_tuples, |
| 545 nonviewable_iids, project_shard_timestamps, invalidation_timestep, |
| 546 use_cached_searches): |
| 547 """Build a set of at-risk IIDs, and accumulate RPCs to get uncached ones.""" |
| 548 if query_project_ids: |
| 549 keys = [] |
| 550 for pid in query_project_ids: |
| 551 for sid in needed_shard_ids: |
| 552 keys.append('%d;%d;%d' % (pid, logged_in_user_id, sid)) |
| 553 else: |
| 554 keys = [('all;%d;%d' % sid) |
| 555 for (logged_in_user_id, sid) in needed_shard_ids] |
| 556 |
| 557 if use_cached_searches: |
| 558 cached_dict = memcache.get_multi(keys, key_prefix='nonviewable:') |
| 559 else: |
| 560 cached_dict = {} |
| 561 |
| 562 for sid in needed_shard_ids: |
| 563 if query_project_ids: |
| 564 for pid in query_project_ids: |
| 565 _AccumulateNonviewableIIDs( |
| 566 pid, logged_in_user_id, sid, cached_dict, nonviewable_iids, |
| 567 project_shard_timestamps, rpc_tuples, invalidation_timestep) |
| 568 else: |
| 569 _AccumulateNonviewableIIDs( |
| 570 None, logged_in_user_id, sid, cached_dict, nonviewable_iids, |
| 571 project_shard_timestamps, rpc_tuples, invalidation_timestep) |
| 572 |
| 573 |
| 574 def _AccumulateNonviewableIIDs( |
| 575 pid, logged_in_user_id, sid, cached_dict, nonviewable_iids, |
| 576 project_shard_timestamps, rpc_tuples, invalidation_timestep): |
| 577 """Use one of the retrieved cache entries or call a backend if needed.""" |
| 578 if pid is None: |
| 579 key = 'all;%d;%d' % (logged_in_user_id, sid) |
| 580 else: |
| 581 key = '%d;%d;%d' % (pid, logged_in_user_id, sid) |
| 582 |
| 583 if key in cached_dict: |
| 584 issue_ids, cached_ts = cached_dict.get(key) |
| 585 modified_ts = project_shard_timestamps.get((pid, sid)) |
| 586 if modified_ts is None or modified_ts > cached_ts: |
| 587 logging.info('nonviewable too stale on (project %r, shard %r)', |
| 588 pid, sid) |
| 589 else: |
| 590 logging.info('adding %d nonviewable issue_ids', len(issue_ids)) |
| 591 nonviewable_iids[sid] = set(issue_ids) |
| 592 |
| 593 if sid not in nonviewable_iids: |
| 594 logging.info('nonviewable for %r not found', key) |
| 595 logging.info('starting backend call for nonviewable iids %r', key) |
| 596 rpc = _StartBackendNonviewableCall( |
| 597 pid, logged_in_user_id, sid, invalidation_timestep) |
| 598 rpc_tuple = (time.time(), sid, rpc) |
| 599 rpc.callback = _MakeBackendCallback( |
| 600 _HandleBackendNonviewableResponse, pid, logged_in_user_id, sid, |
| 601 rpc_tuple, rpc_tuples, settings.backend_retries, nonviewable_iids, |
| 602 invalidation_timestep) |
| 603 rpc_tuples.append(rpc_tuple) |
| 604 |
| 605 |
| 606 def _GetCachedSearchResults( |
| 607 mr, query_project_ids, needed_shard_ids, harmonized_config, |
| 608 project_shard_timestamps, services): |
| 609 """Return a dict of cached search results that are not already stale. |
| 610 |
| 611 If it were not for cross-project search, we would simply cache when we do a |
| 612 search and then invalidate when an issue is modified. But, with |
| 613 cross-project search we don't know all the memcache entries that would |
| 614 need to be invalidated. So, instead, we write the search result cache |
| 615 entries and then an initial modified_ts value for each project if it was |
| 616 not already there. And, when we update an issue we write a new |
| 617 modified_ts entry, which implicitly invalidate all search result |
| 618 cache entries that were written earlier because they are now stale. When |
| 619 reading from the cache, we ignore any query project with modified_ts |
| 620 after its search result cache timestamp, because it is stale. |
| 621 |
| 622 Args: |
| 623 mr: common information parsed from the request. |
| 624 query_project_ids: list of project ID numbers for all projects being |
| 625 searched. |
| 626 needed_shard_ids: set of shard IDs that need to be checked. |
| 627 harmonized_config: ProjectIsueConfig with combined information for all |
| 628 projects involved in this search. |
| 629 project_shard_timestamps: a dict {(project_id, shard_id): timestamp, ...} |
| 630 that tells when each shard was last invalidated. |
| 631 services: connections to backends. |
| 632 |
| 633 Returns: |
| 634 Tuple consisting of: |
| 635 A dictionary {shard_id: [issue_id, ...], ...} of unfiltered search result |
| 636 issue IDs. Only shard_ids found in memcache will be in that dictionary. |
| 637 The result issue IDs must be permission checked before they can be |
| 638 considered to be part of the user's result set. |
| 639 A dictionary {shard_id: bool, ...}. The boolean is set to True if |
| 640 the search results limit of the shard is hit. |
| 641 """ |
| 642 projects_str = ','.join(str(pid) for pid in sorted(query_project_ids)) |
| 643 projects_str = projects_str or 'all' |
| 644 canned_query = savedqueries_helpers.SavedQueryIDToCond( |
| 645 mr.cnxn, services.features, mr.can) |
| 646 logging.info('canned query is %r', canned_query) |
| 647 canned_query = searchpipeline.ReplaceKeywordsWithUserID( |
| 648 mr.me_user_id, canned_query) |
| 649 user_query = searchpipeline.ReplaceKeywordsWithUserID( |
| 650 mr.me_user_id, mr.query) |
| 651 |
| 652 sd = sorting.ComputeSortDirectives(mr, harmonized_config) |
| 653 memcache_prefix = ';'.join([projects_str, canned_query, user_query, |
| 654 ' '.join(sd), '']) |
| 655 cached_dict = memcache.get_multi( |
| 656 [str(sid) for sid in needed_shard_ids], key_prefix=memcache_prefix) |
| 657 search_limit_memcache_prefix = ';'.join( |
| 658 [projects_str, canned_query, user_query, |
| 659 ' '.join(sd), 'search_limit_reached', '']) |
| 660 cached_search_limit_reached_dict = memcache.get_multi( |
| 661 [str(sid) for sid in needed_shard_ids], |
| 662 key_prefix=search_limit_memcache_prefix) |
| 663 |
| 664 unfiltered_dict = {} |
| 665 search_limit_reached_dict = {} |
| 666 for shard_id in needed_shard_ids: |
| 667 if str(shard_id) not in cached_dict: |
| 668 logging.info('memcache miss on shard %r', shard_id) |
| 669 continue |
| 670 |
| 671 cached_iids, cached_ts = cached_dict[str(shard_id)] |
| 672 if cached_search_limit_reached_dict.get(str(shard_id)): |
| 673 search_limit_reached, _ = cached_search_limit_reached_dict[str(shard_id)] |
| 674 else: |
| 675 search_limit_reached = False |
| 676 |
| 677 stale = False |
| 678 if query_project_ids: |
| 679 for project_id in query_project_ids: |
| 680 modified_ts = project_shard_timestamps.get((project_id, shard_id)) |
| 681 if modified_ts is None or modified_ts > cached_ts: |
| 682 stale = True |
| 683 logging.info('memcache too stale on shard %r because of %r', |
| 684 shard_id, project_id) |
| 685 break |
| 686 else: |
| 687 modified_ts = project_shard_timestamps.get(('all', shard_id)) |
| 688 if modified_ts is None or modified_ts > cached_ts: |
| 689 stale = True |
| 690 logging.info('memcache too stale on shard %r because of all', |
| 691 shard_id) |
| 692 |
| 693 if not stale: |
| 694 logging.info('memcache hit on %r', shard_id) |
| 695 unfiltered_dict[shard_id] = cached_iids |
| 696 search_limit_reached_dict[shard_id] = search_limit_reached |
| 697 |
| 698 return unfiltered_dict, search_limit_reached_dict |
| 699 |
| 700 |
| 701 def _MakeBackendRequestHeaders(failfast): |
| 702 headers = { |
| 703 # This is needed to allow frontends to talk to backends without going |
| 704 # through a login screen on googleplex.com. |
| 705 # http://wiki/Main/PrometheusInternal#Internal_Applications_and_APIs |
| 706 'X-URLFetch-Service-Id': 'GOOGLEPLEX', |
| 707 } |
| 708 if failfast: |
| 709 headers['X-AppEngine-FailFast'] = 'Yes' |
| 710 return headers |
| 711 |
| 712 |
| 713 def _StartBackendSearchCall( |
| 714 mr, query_project_names, shard_id, invalidation_timestep, |
| 715 deadline=None, failfast=True): |
| 716 """Ask a backend to query one shard of the database.""" |
| 717 backend_host = modules.get_hostname(module='besearch') |
| 718 url = 'http://%s%s' % (backend_host, framework_helpers.FormatURL( |
| 719 mr, urls.BACKEND_SEARCH, |
| 720 skip_filtering=True, # TODO(jrobbins): remove after next release. |
| 721 projects=','.join(query_project_names), |
| 722 start=0, num=mr.start + mr.num, |
| 723 logged_in_user_id=mr.auth.user_id or 0, |
| 724 me_user_id=mr.me_user_id, shard_id=shard_id, |
| 725 invalidation_timestep=invalidation_timestep)) |
| 726 logging.info('\n\nCalling backend: %s', url) |
| 727 rpc = urlfetch.create_rpc( |
| 728 deadline=deadline or settings.backend_deadline) |
| 729 headers = _MakeBackendRequestHeaders(failfast) |
| 730 # follow_redirects=False is needed to avoid a login screen on googleplex. |
| 731 urlfetch.make_fetch_call(rpc, url, follow_redirects=False, headers=headers) |
| 732 return rpc |
| 733 |
| 734 |
| 735 def _StartBackendNonviewableCall( |
| 736 project_id, logged_in_user_id, shard_id, invalidation_timestep, |
| 737 deadline=None, failfast=True): |
| 738 """Ask a backend to query one shard of the database.""" |
| 739 backend_host = modules.get_hostname(module='besearch') |
| 740 url = 'http://%s%s' % (backend_host, framework_helpers.FormatURL( |
| 741 None, urls.BACKEND_NONVIEWABLE, |
| 742 project_id=project_id or '', |
| 743 logged_in_user_id=logged_in_user_id or '', |
| 744 shard_id=shard_id, |
| 745 invalidation_timestep=invalidation_timestep)) |
| 746 logging.info('Calling backend nonviewable: %s', url) |
| 747 rpc = urlfetch.create_rpc(deadline=deadline or settings.backend_deadline) |
| 748 headers = _MakeBackendRequestHeaders(failfast) |
| 749 # follow_redirects=False is needed to avoid a login screen on googleplex. |
| 750 urlfetch.make_fetch_call(rpc, url, follow_redirects=False, headers=headers) |
| 751 return rpc |
| 752 |
| 753 |
| 754 def _HandleBackendSearchResponse( |
| 755 mr, query_project_names, rpc_tuple, rpc_tuples, remaining_retries, |
| 756 unfiltered_iids, search_limit_reached, invalidation_timestep, |
| 757 error_responses): |
| 758 """Process one backend response and retry if there was an error.""" |
| 759 start_time, shard_id, rpc = rpc_tuple |
| 760 duration_sec = time.time() - start_time |
| 761 |
| 762 try: |
| 763 response = rpc.get_result() |
| 764 logging.info('call to backend took %d sec', duration_sec) |
| 765 # Note that response.content has "})]'\n" prepended to it. |
| 766 json_content = response.content[5:] |
| 767 logging.info('got json text: %r length %r', |
| 768 json_content[:framework_constants.LOGGING_MAX_LENGTH], |
| 769 len(json_content)) |
| 770 json_data = json.loads(json_content) |
| 771 unfiltered_iids[shard_id] = json_data['unfiltered_iids'] |
| 772 search_limit_reached[shard_id] = json_data['search_limit_reached'] |
| 773 |
| 774 except Exception as e: |
| 775 if duration_sec > FAIL_FAST_LIMIT_SEC: # Don't log fail-fast exceptions. |
| 776 logging.exception(e) |
| 777 if not remaining_retries: |
| 778 logging.error('backend search retries exceeded') |
| 779 error_responses.add(shard_id) |
| 780 return # Used all retries, so give up. |
| 781 |
| 782 if duration_sec >= settings.backend_deadline: |
| 783 logging.error('backend search on %r took too long', shard_id) |
| 784 error_responses.add(shard_id) |
| 785 return # That backend shard is overloaded, so give up. |
| 786 |
| 787 logging.error('backend call for shard %r failed, retrying', shard_id) |
| 788 retry_rpc = _StartBackendSearchCall( |
| 789 mr, query_project_names, shard_id, invalidation_timestep, |
| 790 failfast=remaining_retries > 2) |
| 791 retry_rpc_tuple = (time.time(), shard_id, retry_rpc) |
| 792 retry_rpc.callback = _MakeBackendCallback( |
| 793 _HandleBackendSearchResponse, mr, query_project_names, |
| 794 retry_rpc_tuple, rpc_tuples, remaining_retries - 1, unfiltered_iids, |
| 795 search_limit_reached, invalidation_timestep, error_responses) |
| 796 rpc_tuples.append(retry_rpc_tuple) |
| 797 |
| 798 |
| 799 def _HandleBackendNonviewableResponse( |
| 800 project_id, logged_in_user_id, shard_id, rpc_tuple, rpc_tuples, |
| 801 remaining_retries, nonviewable_iids, invalidation_timestep): |
| 802 """Process one backend response and retry if there was an error.""" |
| 803 start_time, shard_id, rpc = rpc_tuple |
| 804 duration_sec = time.time() - start_time |
| 805 |
| 806 try: |
| 807 response = rpc.get_result() |
| 808 logging.info('call to backend nonviewable took %d sec', duration_sec) |
| 809 # Note that response.content has "})]'\n" prepended to it. |
| 810 json_content = response.content[5:] |
| 811 logging.info('got json text: %r length %r', |
| 812 json_content[:framework_constants.LOGGING_MAX_LENGTH], |
| 813 len(json_content)) |
| 814 json_data = json.loads(json_content) |
| 815 nonviewable_iids[shard_id] = set(json_data['nonviewable']) |
| 816 |
| 817 except Exception as e: |
| 818 if duration_sec > FAIL_FAST_LIMIT_SEC: # Don't log fail-fast exceptions. |
| 819 logging.exception(e) |
| 820 |
| 821 if not remaining_retries: |
| 822 logging.warn('Used all retries, so give up on shard %r', shard_id) |
| 823 return |
| 824 |
| 825 if duration_sec >= settings.backend_deadline: |
| 826 logging.error('nonviewable call on %r took too long', shard_id) |
| 827 return # That backend shard is overloaded, so give up. |
| 828 |
| 829 logging.error( |
| 830 'backend nonviewable call for shard %r;%r;%r failed, retrying', |
| 831 project_id, logged_in_user_id, shard_id) |
| 832 retry_rpc = _StartBackendNonviewableCall( |
| 833 project_id, logged_in_user_id, shard_id, invalidation_timestep, |
| 834 failfast=remaining_retries > 2) |
| 835 retry_rpc_tuple = (time.time(), shard_id, retry_rpc) |
| 836 retry_rpc.callback = _MakeBackendCallback( |
| 837 _HandleBackendNonviewableResponse, project_id, logged_in_user_id, |
| 838 shard_id, retry_rpc_tuple, rpc_tuples, remaining_retries - 1, |
| 839 nonviewable_iids, invalidation_timestep) |
| 840 rpc_tuples.append(retry_rpc_tuple) |
| 841 |
| 842 |
| 843 def _TotalLength(sharded_iids): |
| 844 """Return the total length of all issue_iids lists.""" |
| 845 return sum(len(issue_iids) for issue_iids in sharded_iids.itervalues()) |
| 846 |
| 847 |
| 848 def _ReverseShards(sharded_iids): |
| 849 """Reverse each issue_iids list in place.""" |
| 850 for shard_id in sharded_iids: |
| 851 sharded_iids[shard_id].reverse() |
| 852 |
| 853 |
| 854 def _TrimEndShardedIIDs(sharded_iids, sample_iids, num_needed): |
| 855 """Trim the IIDs to keep at least num_needed items. |
| 856 |
| 857 Args: |
| 858 sharded_iids: dict {shard_id: issue_id_list} for search results. This is |
| 859 modified in place to remove some trailing issue IDs. |
| 860 sample_iids: list of IIDs from a sorted list of sample issues. |
| 861 num_needed: int minimum total number of items to keep. Some IIDs that are |
| 862 known to belong in positions > num_needed will be trimmed off. |
| 863 |
| 864 Returns: |
| 865 The total number of IIDs removed from the IID lists. |
| 866 """ |
| 867 # 1. Get (sample_iid, position_in_shard) for each sample. |
| 868 sample_positions = _CalcSamplePositions(sharded_iids, sample_iids) |
| 869 |
| 870 # 2. Walk through the samples, computing a combined lower bound at each |
| 871 # step until we know that we have passed at least num_needed IIDs. |
| 872 lower_bound_per_shard = {} |
| 873 excess_samples = [] |
| 874 for i in range(len(sample_positions)): |
| 875 sample_iid, pos = sample_positions[i] |
| 876 shard_id = sample_iid % settings.num_logical_shards |
| 877 lower_bound_per_shard[shard_id] = pos |
| 878 overall_lower_bound = sum(lower_bound_per_shard.itervalues()) |
| 879 if overall_lower_bound >= num_needed: |
| 880 excess_samples = sample_positions[i + 1:] |
| 881 break |
| 882 else: |
| 883 return 0 # We went through all samples and never reached num_needed. |
| 884 |
| 885 # 3. Truncate each shard at the first excess sample in that shard. |
| 886 already_trimmed = set() |
| 887 num_trimmed = 0 |
| 888 for sample_iid, pos in excess_samples: |
| 889 shard_id = sample_iid % settings.num_logical_shards |
| 890 if shard_id not in already_trimmed: |
| 891 num_trimmed += len(sharded_iids[shard_id]) - pos |
| 892 sharded_iids[shard_id] = sharded_iids[shard_id][:pos] |
| 893 already_trimmed.add(shard_id) |
| 894 |
| 895 return num_trimmed |
| 896 |
| 897 |
| 898 # TODO(jrobbins): Convert this to a python generator. |
| 899 def _CalcSamplePositions(sharded_iids, sample_iids): |
| 900 """Return [(sample_iid, position_in_shard), ...] for each sample.""" |
| 901 # We keep track of how far index() has scanned in each shard to avoid |
| 902 # starting over at position 0 when looking for the next sample in |
| 903 # the same shard. |
| 904 scan_positions = collections.defaultdict(lambda: 0) |
| 905 sample_positions = [] |
| 906 for sample_iid in sample_iids: |
| 907 shard_id = sample_iid % settings.num_logical_shards |
| 908 try: |
| 909 pos = sharded_iids.get(shard_id, []).index( |
| 910 sample_iid, scan_positions[shard_id]) |
| 911 scan_positions[shard_id] = pos |
| 912 sample_positions.append((sample_iid, pos)) |
| 913 except ValueError: |
| 914 pass |
| 915 |
| 916 return sample_positions |
| 917 |
| 918 |
| 919 def _SortIssues(mr, issues, config, users_by_id): |
| 920 """Sort the found issues based on the request and config values. |
| 921 |
| 922 Args: |
| 923 mr: common information parsed from the HTTP request. |
| 924 issues: A list of issues to be sorted. |
| 925 config: A ProjectIssueConfig that could impact sort order. |
| 926 users_by_id: dictionary {user_id: user_view,...} for all users who |
| 927 participate in any issue in the entire list. |
| 928 |
| 929 Returns: |
| 930 A sorted list of issues, based on parameters from mr and config. |
| 931 """ |
| 932 issues = sorting.SortArtifacts( |
| 933 mr, issues, config, tracker_helpers.SORTABLE_FIELDS, |
| 934 username_cols=tracker_constants.USERNAME_COLS, users_by_id=users_by_id) |
| 935 return issues |
OLD | NEW |