Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 # Copyright 2014 The LUCI Authors. All rights reserved. | 1 # Copyright 2014 The LUCI Authors. All rights reserved. |
| 2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
| 3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
| 4 | 4 |
| 5 """High level tasks execution scheduling API. | 5 """High level tasks execution scheduling API. |
| 6 | 6 |
| 7 This is the interface closest to the HTTP handlers. | 7 This is the interface closest to the HTTP handlers. |
| 8 """ | 8 """ |
| 9 | 9 |
| 10 import datetime | 10 import datetime |
| (...skipping 456 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 467 # | 467 # |
| 468 # Do not use "cls.created_ts > oldest" here because this would require a | 468 # Do not use "cls.created_ts > oldest" here because this would require a |
| 469 # composite index. It's unnecessary because TaskRequest.key is equivalent to | 469 # composite index. It's unnecessary because TaskRequest.key is equivalent to |
| 470 # decreasing TaskRequest.created_ts, ordering by key works as well and | 470 # decreasing TaskRequest.created_ts, ordering by key works as well and |
| 471 # doesn't require a composite index. | 471 # doesn't require a composite index. |
| 472 cls = task_result.TaskResultSummary | 472 cls = task_result.TaskResultSummary |
| 473 h = request.properties.properties_hash | 473 h = request.properties.properties_hash |
| 474 | 474 |
| 475 # TODO(maruel): Make a reverse map on successful task completion so this | 475 # TODO(maruel): Make a reverse map on successful task completion so this |
| 476 # becomes a simple ndb.get(). | 476 # becomes a simple ndb.get(). |
| 477 dupe_summary = cls.query(cls.properties_hash==h).order(cls.key).get() | 477 count = 0 |
| 478 if dupe_summary: | 478 for dupe_summary in cls.query(cls.properties_hash==h): |
|
Vadim Sh.
2016/08/31 19:46:06
no order by key anymore?
Vadim Sh.
2016/08/31 19:46:06
nit:
for i, dupe_summary in iterate(....):
...
M-A Ruel
2016/08/31 20:26:30
Done.
M-A Ruel
2016/08/31 20:26:30
Humm not sure why I had removed it, added back.
| |
| 479 count += 1 | |
| 480 # It is possible for the query to return stale items. | |
| 481 if (dupe_summary.state != task_result.State.COMPLETED or | |
| 482 dupe_summary.failure): | |
| 483 if count == 3: | |
|
Vadim Sh.
2016/08/31 19:46:06
Then also use smaller batch_size (it is 20 by defa
M-A Ruel
2016/08/31 20:26:30
Done.
| |
| 484 # Indexes are very inconsistent, give up. | |
| 485 break | |
| 486 continue | |
| 479 # Refuse tasks older than X days. This is due to the isolate server | 487 # Refuse tasks older than X days. This is due to the isolate server |
| 480 # dropping files. | 488 # dropping files. |
| 481 # TODO(maruel): The value should be calculated from the isolate server | 489 # TODO(maruel): The value should be calculated from the isolate server |
| 482 # setting and be unbounded when no isolated input was used. | 490 # setting and be unbounded when no isolated input was used. |
| 483 oldest = now - datetime.timedelta( | 491 oldest = now - datetime.timedelta( |
| 484 seconds=config.settings().reusable_task_age_secs) | 492 seconds=config.settings().reusable_task_age_secs) |
| 485 if dupe_summary and dupe_summary.created_ts > oldest: | 493 if dupe_summary.created_ts <= oldest: |
| 486 # Setting task.queue_number to None removes it from the scheduling. | 494 # Task is too old. |
| 487 task.queue_number = None | 495 break |
| 488 _copy_summary( | 496 # Setting task.queue_number to None removes it from the scheduling. |
|
Vadim Sh.
2016/08/31 19:46:06
nit: break out of the loop here and do the rest of
M-A Ruel
2016/08/31 20:26:30
Refactored searching code into a function.
| |
| 489 dupe_summary, result_summary, | 497 task.queue_number = None |
| 490 ('created_ts', 'modified_ts', 'name', 'user', 'tags')) | 498 _copy_summary( |
| 491 # Zap irrelevant properties. PerformanceStats is also not copied over, | 499 dupe_summary, result_summary, |
| 492 # since it's not relevant. | 500 ('created_ts', 'modified_ts', 'name', 'user', 'tags')) |
| 493 result_summary.properties_hash = None | 501 # Zap irrelevant properties. PerformanceStats is also not copied over, |
| 494 result_summary.try_number = 0 | 502 # since it's not relevant. |
| 495 result_summary.cost_saved_usd = result_summary.cost_usd | 503 result_summary.properties_hash = None |
| 496 # Only zap after. | 504 result_summary.try_number = 0 |
| 497 result_summary.costs_usd = [] | 505 result_summary.cost_saved_usd = result_summary.cost_usd |
| 498 result_summary.deduped_from = task_pack.pack_run_result_key( | 506 # Only zap after. |
| 499 dupe_summary.run_result_key) | 507 result_summary.costs_usd = [] |
| 500 # In this code path, there's not much to do as the task will not be run, | 508 result_summary.deduped_from = task_pack.pack_run_result_key( |
| 501 # previous results are returned. We still need to store all the entities | 509 dupe_summary.run_result_key) |
| 502 # correctly. | 510 # In this code path, there's not much to do as the task will not be run, |
| 503 datastore_utils.insert( | 511 # previous results are returned. We still need to store all the entities |
| 504 request, get_new_keys, extra=[task, result_summary]) | 512 # correctly. |
| 505 logging.debug( | 513 datastore_utils.insert( |
| 506 'New request %s reusing %s', result_summary.task_id, | 514 request, get_new_keys, extra=[task, result_summary]) |
| 507 dupe_summary.task_id) | 515 logging.debug( |
| 508 stored = True | 516 'New request %s reusing %s', result_summary.task_id, |
| 509 deduped = True | 517 dupe_summary.task_id) |
| 518 stored = True | |
| 519 deduped = True | |
| 520 break | |
| 510 | 521 |
| 511 if not stored: | 522 if not stored: |
| 512 # Storing these entities makes this task live. It is important at this point | 523 # Storing these entities makes this task live. It is important at this point |
| 513 # that the HTTP handler returns as fast as possible, otherwise the task will | 524 # that the HTTP handler returns as fast as possible, otherwise the task will |
| 514 # be run but the client will not know about it. | 525 # be run but the client will not know about it. |
| 515 datastore_utils.insert(request, get_new_keys, extra=[task, result_summary]) | 526 datastore_utils.insert(request, get_new_keys, extra=[task, result_summary]) |
| 516 logging.debug('New request %s', result_summary.task_id) | 527 logging.debug('New request %s', result_summary.task_id) |
| 517 | 528 |
| 518 # Get parent task details if applicable. | 529 # Get parent task details if applicable. |
| 519 if request.parent_task_id: | 530 if request.parent_task_id: |
| (...skipping 433 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 953 ## Task queue tasks. | 964 ## Task queue tasks. |
| 954 | 965 |
| 955 | 966 |
| 956 def task_handle_pubsub_task(payload): | 967 def task_handle_pubsub_task(payload): |
| 957 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" | 968 """Handles task enqueued by _maybe_pubsub_notify_via_tq.""" |
| 958 # Do not catch errors to trigger task queue task retry. Errors should not | 969 # Do not catch errors to trigger task queue task retry. Errors should not |
| 959 # happen in normal case. | 970 # happen in normal case. |
| 960 _pubsub_notify( | 971 _pubsub_notify( |
| 961 payload['task_id'], payload['topic'], | 972 payload['task_id'], payload['topic'], |
| 962 payload['auth_token'], payload['userdata']) | 973 payload['auth_token'], payload['userdata']) |
| OLD | NEW |