Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(893)

Side by Side Diff: appengine/swarming/server/task_scheduler.py

Issue 2012843003: Remove use of search API; effectively remove support to search for task by name. (Closed) Base URL: git@github.com:luci/luci-py.git@master
Patch Set: . Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # Copyright 2014 The LUCI Authors. All rights reserved. 1 # Copyright 2014 The LUCI Authors. All rights reserved.
2 # Use of this source code is governed under the Apache License, Version 2.0 2 # Use of this source code is governed under the Apache License, Version 2.0
3 # that can be found in the LICENSE file. 3 # that can be found in the LICENSE file.
4 4
5 """High level tasks execution scheduling API. 5 """High level tasks execution scheduling API.
6 6
7 This is the interface closest to the HTTP handlers. 7 This is the interface closest to the HTTP handlers.
8 """ 8 """
9 9
10 import contextlib 10 import contextlib
11 import datetime 11 import datetime
12 import logging 12 import logging
13 import math 13 import math
14 import random 14 import random
15 15
16 from google.appengine.api import datastore_errors 16 from google.appengine.api import datastore_errors
17 from google.appengine.api import search
18 from google.appengine.ext import ndb 17 from google.appengine.ext import ndb
19 from google.appengine.runtime import apiproxy_errors 18 from google.appengine.runtime import apiproxy_errors
20 19
21 from components import datastore_utils 20 from components import datastore_utils
22 from components import pubsub 21 from components import pubsub
23 from components import utils 22 from components import utils
24 import ts_mon_metrics 23 import ts_mon_metrics
25 from server import config 24 from server import config
26 from server import stats 25 from server import stats
27 from server import task_pack 26 from server import task_pack
(...skipping 376 matching lines...) Expand 10 before | Expand all | Expand 10 after
404 # At this point, the request is now in the DB but not yet in a mode where it 403 # At this point, the request is now in the DB but not yet in a mode where it
405 # can be triggered or visible. Index it right away so it is searchable. If any 404 # can be triggered or visible. Index it right away so it is searchable. If any
406 # of remaining calls in this function fail, the TaskRequest and Search 405 # of remaining calls in this function fail, the TaskRequest and Search
407 # Document will simply point to an incomplete task, which will be ignored. 406 # Document will simply point to an incomplete task, which will be ignored.
408 # 407 #
409 # Creates the entities TaskToRun and TaskResultSummary but do not save them 408 # Creates the entities TaskToRun and TaskResultSummary but do not save them
410 # yet. TaskRunResult will be created once a bot starts it. 409 # yet. TaskRunResult will be created once a bot starts it.
411 task = task_to_run.new_task_to_run(request) 410 task = task_to_run.new_task_to_run(request)
412 result_summary = task_result.new_result_summary(request) 411 result_summary = task_result.new_result_summary(request)
413 412
414 # Do not specify a doc_id, as they are guaranteed to be monotonically
415 # increasing and searches are done in reverse order, which fits exactly the
416 # created_ts ordering. This is useful because DateField is precise to the date
417 # (!) and NumberField is signed 32 bits so the best it could do with EPOCH is
418 # second resolution up to year 2038.
419 index = search.Index(name='requests')
420 packed = task_pack.pack_result_summary_key(result_summary.key)
421 doc = search.Document(
422 fields=[
423 search.TextField(name='name', value=request.name),
424 search.AtomField(name='id', value=packed),
425 ])
426 # Even if it fails here, we're still fine, as the task is not "alive" yet.
427 search_future = index.put_async([doc])
428
429 now = utils.utcnow() 413 now = utils.utcnow()
430 414
431 if dupe_future: 415 if dupe_future:
432 # Reuse the results! 416 # Reuse the results!
433 dupe_summary = dupe_future.get_result() 417 dupe_summary = dupe_future.get_result()
434 # Refuse tasks older than X days. This is due to the isolate server dropping 418 # Refuse tasks older than X days. This is due to the isolate server dropping
435 # files. https://code.google.com/p/swarming/issues/detail?id=197 419 # files. https://code.google.com/p/swarming/issues/detail?id=197
436 oldest = now - datetime.timedelta( 420 oldest = now - datetime.timedelta(
437 seconds=config.settings().reusable_task_age_secs) 421 seconds=config.settings().reusable_task_age_secs)
438 if dupe_summary and dupe_summary.created_ts > oldest: 422 if dupe_summary and dupe_summary.created_ts > oldest:
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
476 for item in items: 460 for item in items:
477 item.children_task_ids.append(k) 461 item.children_task_ids.append(k)
478 item.modified_ts = now 462 item.modified_ts = now
479 ndb.put_multi(items) 463 ndb.put_multi(items)
480 464
481 # Raising will abort to the caller. 465 # Raising will abort to the caller.
482 futures = [datastore_utils.transaction_async(run)] 466 futures = [datastore_utils.transaction_async(run)]
483 if parent_task_keys: 467 if parent_task_keys:
484 futures.append(datastore_utils.transaction_async(run_parent)) 468 futures.append(datastore_utils.transaction_async(run_parent))
485 469
486 try:
487 search_future.get_result()
488 except search.Error:
489 # Do not abort the task, for now search is best effort.
490 logging.exception('Put failed')
491
492 for future in futures: 470 for future in futures:
493 # Check for failures, it would raise in this case, aborting the call. 471 # Check for failures, it would raise in this case, aborting the call.
494 future.get_result() 472 future.get_result()
495 473
496 stats.add_task_entry( 474 stats.add_task_entry(
497 'task_enqueued', result_summary.key, 475 'task_enqueued', result_summary.key,
498 dimensions=request.properties.dimensions, 476 dimensions=request.properties.dimensions,
499 user=request.user) 477 user=request.user)
500 return result_summary 478 return result_summary
501 479
(...skipping 390 matching lines...) Expand 10 before | Expand all | Expand 10 after
892 ## Task queue tasks. 870 ## Task queue tasks.
893 871
894 872
895 def task_handle_pubsub_task(payload): 873 def task_handle_pubsub_task(payload):
896 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" 874 """Handles task enqueued by _maybe_pubsub_notify_via_tq."""
897 # Do not catch errors to trigger task queue task retry. Errors should not 875 # Do not catch errors to trigger task queue task retry. Errors should not
898 # happen in normal case. 876 # happen in normal case.
899 _pubsub_notify( 877 _pubsub_notify(
900 payload['task_id'], payload['topic'], 878 payload['task_id'], payload['topic'],
901 payload['auth_token'], payload['userdata']) 879 payload['auth_token'], payload['userdata'])
OLDNEW
« no previous file with comments | « appengine/swarming/server/task_result_test.py ('k') | appengine/swarming/server/task_scheduler_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698