| OLD | NEW |
| 1 # Copyright 2014 The LUCI Authors. All rights reserved. | 1 # Copyright 2014 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """High level tasks execution scheduling API. | 5 """High level tasks execution scheduling API. |
| 6 | 6 |
| 7 This is the interface closest to the HTTP handlers. | 7 This is the interface closest to the HTTP handlers. |
| 8 """ | 8 """ |
| 9 | 9 |
| 10 import contextlib | 10 import contextlib |
| 11 import datetime | 11 import datetime |
| 12 import logging | 12 import logging |
| 13 import math | 13 import math |
| 14 import random | 14 import random |
| 15 | 15 |
| 16 from google.appengine.api import datastore_errors | 16 from google.appengine.api import datastore_errors |
| 17 from google.appengine.api import search | |
| 18 from google.appengine.ext import ndb | 17 from google.appengine.ext import ndb |
| 19 from google.appengine.runtime import apiproxy_errors | 18 from google.appengine.runtime import apiproxy_errors |
| 20 | 19 |
| 21 from components import datastore_utils | 20 from components import datastore_utils |
| 22 from components import pubsub | 21 from components import pubsub |
| 23 from components import utils | 22 from components import utils |
| 24 import ts_mon_metrics | 23 import ts_mon_metrics |
| 25 from server import config | 24 from server import config |
| 26 from server import stats | 25 from server import stats |
| 27 from server import task_pack | 26 from server import task_pack |
| (...skipping 376 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 404 # At this point, the request is now in the DB but not yet in a mode where it | 403 # At this point, the request is now in the DB but not yet in a mode where it |
| 405 # can be triggered or visible. Index it right away so it is searchable. If any | 404 # can be triggered or visible. Index it right away so it is searchable. If any |
| 406 # of remaining calls in this function fail, the TaskRequest and Search | 405 # of remaining calls in this function fail, the TaskRequest and Search |
| 407 # Document will simply point to an incomplete task, which will be ignored. | 406 # Document will simply point to an incomplete task, which will be ignored. |
| 408 # | 407 # |
| 409 # Creates the entities TaskToRun and TaskResultSummary but do not save them | 408 # Creates the entities TaskToRun and TaskResultSummary but do not save them |
| 410 # yet. TaskRunResult will be created once a bot starts it. | 409 # yet. TaskRunResult will be created once a bot starts it. |
| 411 task = task_to_run.new_task_to_run(request) | 410 task = task_to_run.new_task_to_run(request) |
| 412 result_summary = task_result.new_result_summary(request) | 411 result_summary = task_result.new_result_summary(request) |
| 413 | 412 |
| 414 # Do not specify a doc_id, as they are guaranteed to be monotonically | |
| 415 # increasing and searches are done in reverse order, which fits exactly the | |
| 416 # created_ts ordering. This is useful because DateField is precise to the date | |
| 417 # (!) and NumberField is signed 32 bits so the best it could do with EPOCH is | |
| 418 # second resolution up to year 2038. | |
| 419 index = search.Index(name='requests') | |
| 420 packed = task_pack.pack_result_summary_key(result_summary.key) | |
| 421 doc = search.Document( | |
| 422 fields=[ | |
| 423 search.TextField(name='name', value=request.name), | |
| 424 search.AtomField(name='id', value=packed), | |
| 425 ]) | |
| 426 # Even if it fails here, we're still fine, as the task is not "alive" yet. | |
| 427 search_future = index.put_async([doc]) | |
| 428 | |
| 429 now = utils.utcnow() | 413 now = utils.utcnow() |
| 430 | 414 |
| 431 if dupe_future: | 415 if dupe_future: |
| 432 # Reuse the results! | 416 # Reuse the results! |
| 433 dupe_summary = dupe_future.get_result() | 417 dupe_summary = dupe_future.get_result() |
| 434 # Refuse tasks older than X days. This is due to the isolate server dropping | 418 # Refuse tasks older than X days. This is due to the isolate server dropping |
| 435 # files. https://code.google.com/p/swarming/issues/detail?id=197 | 419 # files. https://code.google.com/p/swarming/issues/detail?id=197 |
| 436 oldest = now - datetime.timedelta( | 420 oldest = now - datetime.timedelta( |
| 437 seconds=config.settings().reusable_task_age_secs) | 421 seconds=config.settings().reusable_task_age_secs) |
| 438 if dupe_summary and dupe_summary.created_ts > oldest: | 422 if dupe_summary and dupe_summary.created_ts > oldest: |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 476 for item in items: | 460 for item in items: |
| 477 item.children_task_ids.append(k) | 461 item.children_task_ids.append(k) |
| 478 item.modified_ts = now | 462 item.modified_ts = now |
| 479 ndb.put_multi(items) | 463 ndb.put_multi(items) |
| 480 | 464 |
| 481 # Raising will abort to the caller. | 465 # Raising will abort to the caller. |
| 482 futures = [datastore_utils.transaction_async(run)] | 466 futures = [datastore_utils.transaction_async(run)] |
| 483 if parent_task_keys: | 467 if parent_task_keys: |
| 484 futures.append(datastore_utils.transaction_async(run_parent)) | 468 futures.append(datastore_utils.transaction_async(run_parent)) |
| 485 | 469 |
| 486 try: | |
| 487 search_future.get_result() | |
| 488 except search.Error: | |
| 489 # Do not abort the task, for now search is best effort. | |
| 490 logging.exception('Put failed') | |
| 491 | |
| 492 for future in futures: | 470 for future in futures: |
| 493 # Check for failures, it would raise in this case, aborting the call. | 471 # Check for failures, it would raise in this case, aborting the call. |
| 494 future.get_result() | 472 future.get_result() |
| 495 | 473 |
| 496 stats.add_task_entry( | 474 stats.add_task_entry( |
| 497 'task_enqueued', result_summary.key, | 475 'task_enqueued', result_summary.key, |
| 498 dimensions=request.properties.dimensions, | 476 dimensions=request.properties.dimensions, |
| 499 user=request.user) | 477 user=request.user) |
| 500 return result_summary | 478 return result_summary |
| 501 | 479 |
| (...skipping 390 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 892 ## Task queue tasks. | 870 ## Task queue tasks. |
| 893 | 871 |
| 894 | 872 |
| 895 def task_handle_pubsub_task(payload): | 873 def task_handle_pubsub_task(payload): |
| 896 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" | 874 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" |
| 897 # Do not catch errors to trigger task queue task retry. Errors should not | 875 # Do not catch errors to trigger task queue task retry. Errors should not |
| 898 # happen in normal case. | 876 # happen in normal case. |
| 899 _pubsub_notify( | 877 _pubsub_notify( |
| 900 payload['task_id'], payload['topic'], | 878 payload['task_id'], payload['topic'], |
| 901 payload['auth_token'], payload['userdata']) | 879 payload['auth_token'], payload['userdata']) |
| OLD | NEW |