Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 # Copyright 2014 The LUCI Authors. All rights reserved. | 1 # Copyright 2014 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """High level tasks execution scheduling API. | 5 """High level tasks execution scheduling API. |
| 6 | 6 |
| 7 This is the interface closest to the HTTP handlers. | 7 This is the interface closest to the HTTP handlers. |
| 8 """ | 8 """ |
| 9 | 9 |
| 10 import datetime | 10 import datetime |
| (...skipping 384 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 395 k: dimension name. | 395 k: dimension name. |
| 396 v: dimension value. | 396 v: dimension value. |
| 397 """ | 397 """ |
| 398 for e in dim_acls.entry: | 398 for e in dim_acls.entry: |
| 399 if '%s:%s' % (k, v) in e.dimension or '%s:*' % k in e.dimension: | 399 if '%s:%s' % (k, v) in e.dimension or '%s:*' % k in e.dimension: |
| 400 return auth.is_group_member(e.usable_by, ident) | 400 return auth.is_group_member(e.usable_by, ident) |
| 401 # A dimension not mentioned in 'dimension_acls' is allowed by default. | 401 # A dimension not mentioned in 'dimension_acls' is allowed by default. |
| 402 return True | 402 return True |
| 403 | 403 |
| 404 | 404 |
| 405 def _find_dupe_task(now, h): | |
| 406 """Finds a previously run task that is also idempotent and completed. | |
| 407 | |
| 408 Fetch items that can be used to dedupe the task. See the comment for this | |
| 409 property for more details. | |
| 410 | |
| 411 Do not use "task_result.TaskResultSummary.created_ts > oldest" here because | |
| 412 this would require a composite index. It's unnecessary because TaskRequest.key | |
| 413 is equivalent to decreasing TaskRequest.created_ts, ordering by key works as | |
| 414 well and doesn't require a composite index. | |
| 415 """ | |
| 416 # TODO(maruel): Make a reverse map on successful task completion so this | |
| 417 # becomes a simple ndb.get(). | |
| 418 cls = task_result.TaskResultSummary | |
| 419 q = cls.query(cls.properties_hash==h).order(cls.key) | |
| 420 for i, dupe_summary in enumerate(q.iter(batch_size=1)): | |
|
M-A Ruel
2016/08/31 20:26:30
Using batch_size=1 since the general case is to on
| |
| 421 # It is possible for the query to return stale items. | |
| 422 if (dupe_summary.state != task_result.State.COMPLETED or | |
| 423 dupe_summary.failure): | |
| 424 if i == 2: | |
| 425 # Indexes are very inconsistent, give up. | |
| 426 return None | |
| 427 continue | |
| 428 | |
| 429 # Refuse tasks older than X days. This is due to the isolate server | |
| 430 # dropping files. | |
| 431 # TODO(maruel): The value should be calculated from the isolate server | |
| 432 # setting and be unbounded when no isolated input was used. | |
| 433 oldest = now - datetime.timedelta( | |
| 434 seconds=config.settings().reusable_task_age_secs) | |
| 435 if dupe_summary.created_ts <= oldest: | |
| 436 return None | |
| 437 return dupe_summary | |
| 438 return None | |
| 439 | |
| 440 | |
| 405 ### Public API. | 441 ### Public API. |
| 406 | 442 |
| 407 | 443 |
| 408 def exponential_backoff(attempt_num): | 444 def exponential_backoff(attempt_num): |
| 409 """Returns an exponential backoff value in seconds.""" | 445 """Returns an exponential backoff value in seconds.""" |
| 410 assert attempt_num >= 0 | 446 assert attempt_num >= 0 |
| 411 if random.random() < _PROBABILITY_OF_QUICK_COMEBACK: | 447 if random.random() < _PROBABILITY_OF_QUICK_COMEBACK: |
| 412 # Randomly ask the bot to return quickly. | 448 # Randomly ask the bot to return quickly. |
| 413 return 1.0 | 449 return 1.0 |
| 414 | 450 |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 452 def get_new_keys(): | 488 def get_new_keys(): |
| 453 # Warning: this assumes knowledge about the hierarchy of each entity. | 489 # Warning: this assumes knowledge about the hierarchy of each entity. |
| 454 key = task_request.new_request_key() | 490 key = task_request.new_request_key() |
| 455 task.key.parent = key | 491 task.key.parent = key |
| 456 old = result_summary.task_id | 492 old = result_summary.task_id |
| 457 result_summary.parent = key | 493 result_summary.parent = key |
| 458 logging.info('%s conflicted, using %s', old, result_summary.task_id) | 494 logging.info('%s conflicted, using %s', old, result_summary.task_id) |
| 459 return key | 495 return key |
| 460 | 496 |
| 461 deduped = False | 497 deduped = False |
| 462 stored = False | |
| 463 if request.properties.idempotent: | 498 if request.properties.idempotent: |
| 464 # Find a previously run task that is also idempotent and completed. Start a | 499 dupe_summary = _find_dupe_task(now, request.properties.properties_hash) |
| 465 # query to fetch items that can be used to dedupe the task. See the comment | 500 if dupe_summary: |
| 466 # for this property for more details. | 501 # Setting task.queue_number to None removes it from the scheduling. |
| 467 # | 502 task.queue_number = None |
| 468 # Do not use "cls.created_ts > oldest" here because this would require a | 503 _copy_summary( |
| 469 # composite index. It's unnecessary because TaskRequest.key is equivalent to | 504 dupe_summary, result_summary, |
| 470 # decreasing TaskRequest.created_ts, ordering by key works as well and | 505 ('created_ts', 'modified_ts', 'name', 'user', 'tags')) |
| 471 # doesn't require a composite index. | 506 # Zap irrelevant properties. PerformanceStats is also not copied over, |
| 472 cls = task_result.TaskResultSummary | 507 # since it's not relevant. |
| 473 h = request.properties.properties_hash | 508 result_summary.properties_hash = None |
| 509 result_summary.try_number = 0 | |
| 510 result_summary.cost_saved_usd = result_summary.cost_usd | |
| 511 # Only zap after. | |
| 512 result_summary.costs_usd = [] | |
| 513 result_summary.deduped_from = task_pack.pack_run_result_key( | |
| 514 dupe_summary.run_result_key) | |
| 515 # In this code path, there's not much to do as the task will not be run, | |
| 516 # previous results are returned. We still need to store all the entities | |
| 517 # correctly. | |
| 518 datastore_utils.insert( | |
| 519 request, get_new_keys, extra=[task, result_summary]) | |
| 520 logging.debug( | |
| 521 'New request %s reusing %s', result_summary.task_id, | |
| 522 dupe_summary.task_id) | |
| 523 deduped = True | |
| 474 | 524 |
| 475 # TODO(maruel): Make a reverse map on successful task completion so this | 525 if not deduped: |
| 476 # becomes a simple ndb.get(). | |
| 477 dupe_summary = cls.query(cls.properties_hash==h).order(cls.key).get() | |
| 478 if dupe_summary: | |
| 479 # Refuse tasks older than X days. This is due to the isolate server | |
| 480 # dropping files. | |
| 481 # TODO(maruel): The value should be calculated from the isolate server | |
| 482 # setting and be unbounded when no isolated input was used. | |
| 483 oldest = now - datetime.timedelta( | |
| 484 seconds=config.settings().reusable_task_age_secs) | |
| 485 if dupe_summary and dupe_summary.created_ts > oldest: | |
| 486 # Setting task.queue_number to None removes it from the scheduling. | |
| 487 task.queue_number = None | |
| 488 _copy_summary( | |
| 489 dupe_summary, result_summary, | |
| 490 ('created_ts', 'modified_ts', 'name', 'user', 'tags')) | |
| 491 # Zap irrelevant properties. PerformanceStats is also not copied over, | |
| 492 # since it's not relevant. | |
| 493 result_summary.properties_hash = None | |
| 494 result_summary.try_number = 0 | |
| 495 result_summary.cost_saved_usd = result_summary.cost_usd | |
| 496 # Only zap after. | |
| 497 result_summary.costs_usd = [] | |
| 498 result_summary.deduped_from = task_pack.pack_run_result_key( | |
| 499 dupe_summary.run_result_key) | |
| 500 # In this code path, there's not much to do as the task will not be run, | |
| 501 # previous results are returned. We still need to store all the entities | |
| 502 # correctly. | |
| 503 datastore_utils.insert( | |
| 504 request, get_new_keys, extra=[task, result_summary]) | |
| 505 logging.debug( | |
| 506 'New request %s reusing %s', result_summary.task_id, | |
| 507 dupe_summary.task_id) | |
| 508 stored = True | |
| 509 deduped = True | |
| 510 | |
| 511 if not stored: | |
| 512 # Storing these entities makes this task live. It is important at this point | 526 # Storing these entities makes this task live. It is important at this point |
| 513 # that the HTTP handler returns as fast as possible, otherwise the task will | 527 # that the HTTP handler returns as fast as possible, otherwise the task will |
| 514 # be run but the client will not know about it. | 528 # be run but the client will not know about it. |
| 515 datastore_utils.insert(request, get_new_keys, extra=[task, result_summary]) | 529 datastore_utils.insert(request, get_new_keys, extra=[task, result_summary]) |
| 516 logging.debug('New request %s', result_summary.task_id) | 530 logging.debug('New request %s', result_summary.task_id) |
| 517 | 531 |
| 518 # Get parent task details if applicable. | 532 # Get parent task details if applicable. |
| 519 if request.parent_task_id: | 533 if request.parent_task_id: |
| 520 parent_run_key = task_pack.unpack_run_result_key(request.parent_task_id) | 534 parent_run_key = task_pack.unpack_run_result_key(request.parent_task_id) |
| 521 parent_task_keys = [ | 535 parent_task_keys = [ |
| (...skipping 431 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 953 ## Task queue tasks. | 967 ## Task queue tasks. |
| 954 | 968 |
| 955 | 969 |
| 956 def task_handle_pubsub_task(payload): | 970 def task_handle_pubsub_task(payload): |
| 957 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" | 971 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" |
| 958 # Do not catch errors to trigger task queue task retry. Errors should not | 972 # Do not catch errors to trigger task queue task retry. Errors should not |
| 959 # happen in normal case. | 973 # happen in normal case. |
| 960 _pubsub_notify( | 974 _pubsub_notify( |
| 961 payload['task_id'], payload['topic'], | 975 payload['task_id'], payload['topic'], |
| 962 payload['auth_token'], payload['userdata']) | 976 payload['auth_token'], payload['userdata']) |
| OLD | NEW |