OLD | NEW |
---|---|
1 # Copyright 2014 The LUCI Authors. All rights reserved. | 1 # Copyright 2014 The LUCI Authors. All rights reserved. |
2 # Use of this source code is governed under the Apache License, Version 2.0 | 2 # Use of this source code is governed under the Apache License, Version 2.0 |
3 # that can be found in the LICENSE file. | 3 # that can be found in the LICENSE file. |
4 | 4 |
5 """Task execution result models. | 5 """Task execution result models. |
6 | 6 |
7 This module doesn't do the scheduling itself. It only describes the entities to | 7 This module doesn't do the scheduling itself. It only describes the entities to |
8 store tasks results. | 8 store tasks results. |
9 | 9 |
10 - TaskResultSummary represents the overall result for the TaskRequest taking in | 10 - TaskResultSummary represents the overall result for the TaskRequest taking in |
(...skipping 40 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
51 |id=1 (not stored)| |id=1 | | 51 |id=1 (not stored)| |id=1 | |
52 +-----------------+ +----------------+ | 52 +-----------------+ +----------------+ |
53 ^ ^ | 53 ^ ^ |
54 | | | 54 | | |
55 +---------------+ +---------------+ | 55 +---------------+ +---------------+ |
56 |TaskOutputChunk| |TaskOutputChunk| ... | 56 |TaskOutputChunk| |TaskOutputChunk| ... |
57 |id=1 | |id=2 | | 57 |id=1 | |id=2 | |
58 +---------------+ +---------------+ | 58 +---------------+ +---------------+ |
59 """ | 59 """ |
60 | 60 |
61 import collections | |
61 import datetime | 62 import datetime |
62 import logging | 63 import logging |
63 import random | 64 import random |
65 import re | |
64 | 66 |
65 from google.appengine.api import datastore_errors | 67 from google.appengine.api import datastore_errors |
66 from google.appengine.datastore import datastore_query | 68 from google.appengine.datastore import datastore_query |
67 from google.appengine.ext import ndb | 69 from google.appengine.ext import ndb |
68 | 70 |
69 from components import datastore_utils | 71 from components import datastore_utils |
70 from components import utils | 72 from components import utils |
71 from server import large | 73 from server import large |
72 from server import task_pack | 74 from server import task_pack |
73 from server import task_request | 75 from server import task_request |
74 | 76 |
77 import cipd | |
78 | |
75 # Amount of time after which a bot is considered dead. In short, if a bot has | 79 # Amount of time after which a bot is considered dead. In short, if a bot has |
76 # not ping in the last 5 minutes while running a task, it is considered dead. | 80 # not ping in the last 5 minutes while running a task, it is considered dead. |
77 BOT_PING_TOLERANCE = datetime.timedelta(seconds=5*60) | 81 BOT_PING_TOLERANCE = datetime.timedelta(seconds=5*60) |
78 | 82 |
79 | 83 |
80 class State(object): | 84 class State(object): |
81 """States in which a task can be. | 85 """States in which a task can be. |
82 | 86 |
83 It's in fact an enum. Values should be in decreasing order of importance. | 87 It's in fact an enum. Values should be in decreasing order of importance. |
84 """ | 88 """ |
(...skipping 214 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
299 @property | 303 @property |
300 def is_valid(self): | 304 def is_valid(self): |
301 return self.bot_overhead is not None | 305 return self.bot_overhead is not None |
302 | 306 |
303 def _pre_put_hook(self): | 307 def _pre_put_hook(self): |
304 if self.bot_overhead is None: | 308 if self.bot_overhead is None: |
305 raise datastore_errors.BadValueError( | 309 raise datastore_errors.BadValueError( |
306 'PerformanceStats.bot_overhead is required') | 310 'PerformanceStats.bot_overhead is required') |
307 | 311 |
308 | 312 |
313 class CipdPins(ndb.Model): | |
314 """Specifies which CIPD client and packages were actually installed. | |
315 | |
316 A part of _TaskResultCommon. | |
317 """ | |
318 # CIPD package of CIPD client to use. | |
319 # client_package.package_name and version are provided. | |
320 # client_package.path will be None. | |
321 client_package = ndb.LocalStructuredProperty(task_request.CipdPackage) | |
322 | |
323 # List of packages to install in $CIPD_PATH prior task execution. | |
324 packages = ndb.LocalStructuredProperty(task_request.CipdPackage, | |
325 repeated=True) | |
326 | |
327 | |
309 class _TaskResultCommon(ndb.Model): | 328 class _TaskResultCommon(ndb.Model): |
310 """Contains properties that is common to both TaskRunResult and | 329 """Contains properties that is common to both TaskRunResult and |
311 TaskResultSummary. | 330 TaskResultSummary. |
312 | 331 |
313 It is not meant to be instantiated on its own. | 332 It is not meant to be instantiated on its own. |
314 | 333 |
315 TODO(maruel): Overhaul this entity: | 334 TODO(maruel): Overhaul this entity: |
316 - Get rid of TaskOutput as it is not needed anymore (?) | 335 - Get rid of TaskOutput as it is not needed anymore (?) |
317 """ | 336 """ |
318 # Bot that ran this task. | 337 # Bot that ran this task. |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
366 # Children tasks that were triggered by this task. This is set when the task | 385 # Children tasks that were triggered by this task. This is set when the task |
367 # reentrantly creates other Swarming tasks. Note that the task_id is to a | 386 # reentrantly creates other Swarming tasks. Note that the task_id is to a |
368 # TaskResultSummary. | 387 # TaskResultSummary. |
369 children_task_ids = ndb.StringProperty( | 388 children_task_ids = ndb.StringProperty( |
370 validator=_validate_task_summary_id, repeated=True) | 389 validator=_validate_task_summary_id, repeated=True) |
371 | 390 |
372 # File outputs of the task. Only set if TaskRequest.properties.sources_ref is | 391 # File outputs of the task. Only set if TaskRequest.properties.sources_ref is |
373 # set. The isolateserver and namespace should match. | 392 # set. The isolateserver and namespace should match. |
374 outputs_ref = ndb.LocalStructuredProperty(task_request.FilesRef) | 393 outputs_ref = ndb.LocalStructuredProperty(task_request.FilesRef) |
375 | 394 |
395 # The pinned versions of all the CIPD packages used in the task. | |
396 cipd_pins = ndb.LocalStructuredProperty(CipdPins) | |
397 | |
376 @property | 398 @property |
377 def can_be_canceled(self): | 399 def can_be_canceled(self): |
378 """Returns True if the task is in a state that can be canceled.""" | 400 """Returns True if the task is in a state that can be canceled.""" |
379 # TOOD(maruel): To be able to add State.RUNNING, the following must be done: | 401 # TOOD(maruel): To be able to add State.RUNNING, the following must be done: |
380 # task_scheduler.cancel_task() must be strictly a transaction relative to | 402 # task_scheduler.cancel_task() must be strictly a transaction relative to |
381 # task_scheduler.bot_kill_task() and task_scheduler.bot_update_task(). | 403 # task_scheduler.bot_kill_task() and task_scheduler.bot_update_task(). |
382 # | 404 # |
383 # The tricky part is to keep this code performant. On the other hand, all | 405 # The tricky part is to keep this code performant. On the other hand, all |
384 # the entities under the transaction (TaskToRun, TaskResultSummary and | 406 # the entities under the transaction (TaskToRun, TaskResultSummary and |
385 # TaskRunResult) are under the same entity root, so it's definitely | 407 # TaskRunResult) are under the same entity root, so it's definitely |
(...skipping 171 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
557 present. | 579 present. |
558 """ | 580 """ |
559 if not self.run_result_key or not self.stdout_chunks: | 581 if not self.run_result_key or not self.stdout_chunks: |
560 # The task was not reaped or no output was streamed for this index yet. | 582 # The task was not reaped or no output was streamed for this index yet. |
561 raise ndb.Return(None) | 583 raise ndb.Return(None) |
562 | 584 |
563 output_key = _run_result_key_to_output_key(self.run_result_key) | 585 output_key = _run_result_key_to_output_key(self.run_result_key) |
564 out = yield TaskOutput.get_output_async(output_key, self.stdout_chunks) | 586 out = yield TaskOutput.get_output_async(output_key, self.stdout_chunks) |
565 raise ndb.Return(out) | 587 raise ndb.Return(out) |
566 | 588 |
589 def validate(self, request): | |
590 """Validation that requires the task_request. | |
591 | |
592 Full validation includes calling this method, and the checks in | |
593 _pre_put_hook. | |
594 | |
595 Raises ValueError if this is invalid, otherwise returns None. | |
596 """ | |
597 props = request.properties | |
598 | |
599 if props.cipd_input and self.cipd_pins: | |
600 check = cipd.PinChecker().check | |
M-A Ruel
2016/08/30 02:18:07
having check() a staticmethod makes this more sens
iannucci
2016/08/30 09:13:29
done
| |
601 check(props.cipd_input.client_package, self.cipd_pins.client_package) | |
602 if len(props.cipd_input.packages) != len(self.cipd_pins.packages): | |
603 raise ValueError('Mismatched package lengths') | |
604 for a, b in zip(props.cipd_input.packages, self.cipd_pins.packages): | |
605 check(a, b) | |
606 | |
567 def _pre_put_hook(self): | 607 def _pre_put_hook(self): |
568 """Use extra validation that cannot be validated throught 'validator'.""" | 608 """Use extra validation that cannot be validated throught 'validator'.""" |
569 super(_TaskResultCommon, self)._pre_put_hook() | 609 super(_TaskResultCommon, self)._pre_put_hook() |
570 if self.state == State.EXPIRED: | 610 if self.state == State.EXPIRED: |
571 if self.failure or self.exit_code is not None: | 611 if self.failure or self.exit_code is not None: |
572 raise datastore_errors.BadValueError( | 612 raise datastore_errors.BadValueError( |
573 'Unexpected State, a task can\'t fail if it hasn\'t started yet') | 613 'Unexpected State, a task can\'t fail if it hasn\'t started yet') |
574 | 614 |
575 if self.state == State.TIMED_OUT and not self.failure: | 615 if self.state == State.TIMED_OUT and not self.failure: |
576 raise datastore_errors.BadValueError('Timeout implies task failure') | 616 raise datastore_errors.BadValueError('Timeout implies task failure') |
(...skipping 198 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
775 return None | 815 return None |
776 return task_pack.result_summary_key_to_run_result_key( | 816 return task_pack.result_summary_key_to_run_result_key( |
777 self.key, self.try_number) | 817 self.key, self.try_number) |
778 | 818 |
779 @property | 819 @property |
780 def task_id(self): | 820 def task_id(self): |
781 return task_pack.pack_result_summary_key(self.key) | 821 return task_pack.pack_result_summary_key(self.key) |
782 | 822 |
783 def reset_to_pending(self): | 823 def reset_to_pending(self): |
784 """Resets this entity to pending state.""" | 824 """Resets this entity to pending state.""" |
825 self.cipd_pins = None | |
785 self.duration = None | 826 self.duration = None |
786 self.exit_code = None | 827 self.exit_code = None |
787 self.internal_failure = False | 828 self.internal_failure = False |
788 self.outputs_ref = None | 829 self.outputs_ref = None |
789 self.started_ts = None | 830 self.started_ts = None |
790 self.state = State.PENDING | 831 self.state = State.PENDING |
791 | 832 |
792 def set_from_run_result(self, run_result, request): | 833 def set_from_run_result(self, run_result, request): |
793 """Copies all the relevant properties from a TaskRunResult into this | 834 """Copies all the relevant properties from a TaskRunResult into this |
794 TaskResultSummary. | 835 TaskResultSummary. |
(...skipping 366 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1161 if tags: | 1202 if tags: |
1162 # Add TaskResultSummary indexes if desired. | 1203 # Add TaskResultSummary indexes if desired. |
1163 if sort != 'created_ts': | 1204 if sort != 'created_ts': |
1164 raise ValueError( | 1205 raise ValueError( |
1165 'Add needed indexes for sort:%s and tags if desired' % sort) | 1206 'Add needed indexes for sort:%s and tags if desired' % sort) |
1166 tags_filter = TaskResultSummary.tags == tags[0] | 1207 tags_filter = TaskResultSummary.tags == tags[0] |
1167 for tag in tags[1:]: | 1208 for tag in tags[1:]: |
1168 tags_filter = ndb.AND(tags_filter, TaskResultSummary.tags == tag) | 1209 tags_filter = ndb.AND(tags_filter, TaskResultSummary.tags == tag) |
1169 query = query.filter(tags_filter) | 1210 query = query.filter(tags_filter) |
1170 return _filter_query(TaskResultSummary, query, start, end, sort, state) | 1211 return _filter_query(TaskResultSummary, query, start, end, sort, state) |
OLD | NEW |