| OLD | NEW |
| 1 # coding: utf-8 | 1 # coding: utf-8 |
| 2 # Copyright 2014 The LUCI Authors. All rights reserved. | 2 # Copyright 2014 The LUCI Authors. All rights reserved. |
| 3 # Use of this source code is governed under the Apache License, Version 2.0 | 3 # Use of this source code is governed under the Apache License, Version 2.0 |
| 4 # that can be found in the LICENSE file. | 4 # that can be found in the LICENSE file. |
| 5 | 5 |
| 6 """Tasks definition. | 6 """Tasks definition. |
| 7 | 7 |
| 8 Each user request creates a new TaskRequest. The TaskRequest instance saves the | 8 Each user request creates a new TaskRequest. The TaskRequest instance saves the |
| 9 metadata of the request, e.g. who requested it, when why, etc. It links to the | 9 metadata of the request, e.g. who requested it, when why, etc. It links to the |
| 10 actual data of the request in a TaskProperties. The TaskProperties represents | 10 actual data of the request in a TaskProperties. The TaskProperties represents |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 44 | | 44 | |
| 45 v | 45 v |
| 46 <See task_to_run.py and task_result.py> | 46 <See task_to_run.py and task_result.py> |
| 47 | 47 |
| 48 TaskProperties is embedded in TaskRequest. TaskProperties is still declared as a | 48 TaskProperties is embedded in TaskRequest. TaskProperties is still declared as a |
| 49 separate entity to clearly declare the boundary for task request deduplication. | 49 separate entity to clearly declare the boundary for task request deduplication. |
| 50 """ | 50 """ |
| 51 | 51 |
| 52 import datetime | 52 import datetime |
| 53 import hashlib | 53 import hashlib |
| 54 import itertools |
| 54 import posixpath | 55 import posixpath |
| 55 import random | 56 import random |
| 56 import re | 57 import re |
| 57 import urlparse | 58 import urlparse |
| 58 | 59 |
| 59 from google.appengine.api import datastore_errors | 60 from google.appengine.api import datastore_errors |
| 60 from google.appengine.ext import ndb | 61 from google.appengine.ext import ndb |
| 61 | 62 |
| 62 from components import auth | 63 from components import auth |
| 63 from components import datastore_utils | 64 from components import datastore_utils |
| (...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 143 """Validates TaskProperties.env.""" | 144 """Validates TaskProperties.env.""" |
| 144 # pylint: disable=W0212 | 145 # pylint: disable=W0212 |
| 145 if not all( | 146 if not all( |
| 146 isinstance(k, unicode) and isinstance(v, unicode) | 147 isinstance(k, unicode) and isinstance(v, unicode) |
| 147 for k, v in value.iteritems()): | 148 for k, v in value.iteritems()): |
| 148 # pylint: disable=W0212 | 149 # pylint: disable=W0212 |
| 149 raise TypeError( | 150 raise TypeError( |
| 150 '%s must be a dict of strings, not %r' % (prop._name, value)) | 151 '%s must be a dict of strings, not %r' % (prop._name, value)) |
| 151 | 152 |
| 152 | 153 |
| 153 def _validate_dimensions(prop, value): | 154 def _validate_dimension_flat(_prop, value): |
| 154 """Validates TaskProperties.dimensions.""" | 155 """Validates TaskProperties.dimensions_flat.""" |
| 155 # pylint: disable=W0212 | |
| 156 if not value: | 156 if not value: |
| 157 raise datastore_errors.BadValueError(u'%s must be specified' % prop._name) | 157 raise datastore_errors.BadValueError(u'dimensions must be specified') |
| 158 key, val = value.split(u':', 1) |
| 159 if not config.validate_dimension_key(key): |
| 160 raise datastore_errors.BadValueError(u'dimension %r isn\'t valid' % key) |
| 161 if not config.validate_dimension_value(val): |
| 162 raise datastore_errors.BadValueError( |
| 163 u'dimension %r:%r isn\'t valid' % (key, val)) |
| 164 |
| 165 |
| 166 def _validate_dimensions_dict(prop, value): |
| 167 """Validates old TaskProperties.dimensions_dict.""" |
| 158 _validate_dict_of_strings(prop, value) | 168 _validate_dict_of_strings(prop, value) |
| 159 for key, val in value.iteritems(): | 169 for key, val in value.iteritems(): |
| 160 if not config.validate_dimension_key(key): | 170 if not config.validate_dimension_key(key): |
| 161 raise datastore_errors.BadValueError(u'dimension %r isn\'t valid' % key) | 171 raise datastore_errors.BadValueError(u'dimension %r isn\'t valid' % key) |
| 162 if not config.validate_dimension_value(val): | 172 if not config.validate_dimension_value(val): |
| 163 raise datastore_errors.BadValueError( | 173 raise datastore_errors.BadValueError( |
| 164 u'dimension %r:%r isn\'t valid' % (key, val)) | 174 u'dimension %r:%r isn\'t valid' % (key, val)) |
| 165 if u'pool' not in value and u'id' not in value: | |
| 166 raise datastore_errors.BadValueError( | |
| 167 u'At least one of \'id\' or \'pool\' must be used as %s' % prop._name) | |
| 168 if len(value) > 64: | |
| 169 raise datastore_errors.BadValueError( | |
| 170 '%s can have up to 64 keys' % prop._name) | |
| 171 | 175 |
| 172 | 176 |
| 173 def _validate_env(prop, value): | 177 def _validate_env(prop, value): |
| 174 _validate_dict_of_strings(prop, value) | 178 _validate_dict_of_strings(prop, value) |
| 175 maxlen = 1024 | 179 maxlen = 1024 |
| 176 for k, v in value.iteritems(): | 180 for k, v in value.iteritems(): |
| 177 if not k: | 181 if not k: |
| 178 raise datastore_errors.BadValueError( | 182 raise datastore_errors.BadValueError( |
| 179 'valid key are required in %s' % prop._name) | 183 'valid key are required in %s' % prop._name) |
| 180 if len(k) > maxlen: | 184 if len(k) > maxlen: |
| (...skipping 300 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 481 # output too. See TODO at the top of this class. | 485 # output too. See TODO at the top of this class. |
| 482 # May be non-None even if task input is not isolated. | 486 # May be non-None even if task input is not isolated. |
| 483 # | 487 # |
| 484 # Only inputs_ref.isolated or command can be specified. | 488 # Only inputs_ref.isolated or command can be specified. |
| 485 inputs_ref = ndb.LocalStructuredProperty(FilesRef) | 489 inputs_ref = ndb.LocalStructuredProperty(FilesRef) |
| 486 | 490 |
| 487 # CIPD packages to install. | 491 # CIPD packages to install. |
| 488 cipd_input = ndb.LocalStructuredProperty(CipdInput) | 492 cipd_input = ndb.LocalStructuredProperty(CipdInput) |
| 489 | 493 |
| 490 # Filter to use to determine the required properties on the bot to run on. For | 494 # Filter to use to determine the required properties on the bot to run on. For |
| 491 # example, Windows or hostname. Encoded as json. Either 'pool' or 'id' | 495 # example, Windows or hostname. Either 'pool' or 'id' dimension is required. |
| 492 # dimension are required (see _validate_dimensions). | 496 dimensions_flat = ndb.StringProperty( |
| 493 dimensions = datastore_utils.DeterministicJsonProperty( | 497 validator=_validate_dimension_flat, indexed=False, repeated=True) |
| 494 validator=_validate_dimensions, json_type=dict, indexed=False) | 498 # Old way to express dimensions. Not used anymore as this doesn't support |
| 499 # repeated keys. |
| 500 dimensions_dict = datastore_utils.DeterministicJsonProperty( |
| 501 validator=_validate_dimensions_dict, json_type=dict, indexed=False, |
| 502 name='dimensions') |
| 495 | 503 |
| 496 # Environment variables. Encoded as json. Optional. | 504 # Environment variables. Encoded as json. Optional. |
| 497 env = datastore_utils.DeterministicJsonProperty( | 505 env = datastore_utils.DeterministicJsonProperty( |
| 498 validator=_validate_env, json_type=dict, indexed=False) | 506 validator=_validate_env, json_type=dict, indexed=False) |
| 499 | 507 |
| 500 # Maximum duration the bot can take to run this task. It's named hard_timeout | 508 # Maximum duration the bot can take to run this task. It's named hard_timeout |
| 501 # in the bot. | 509 # in the bot. |
| 502 execution_timeout_secs = ndb.IntegerProperty( | 510 execution_timeout_secs = ndb.IntegerProperty( |
| 503 validator=_validate_timeout, required=True, indexed=False) | 511 validator=_validate_timeout, required=True, indexed=False) |
| 504 | 512 |
| (...skipping 21 matching lines...) Expand all Loading... |
| 526 # $(ISOLATED_OUTDIR) will be returned; otherwise, the files in this list | 534 # $(ISOLATED_OUTDIR) will be returned; otherwise, the files in this list |
| 527 # will be added to those in that directory. | 535 # will be added to those in that directory. |
| 528 outputs = ndb.StringProperty(repeated=True, indexed=False, | 536 outputs = ndb.StringProperty(repeated=True, indexed=False, |
| 529 validator=_validate_output_path) | 537 validator=_validate_output_path) |
| 530 | 538 |
| 531 # If True, the TaskRequest embedding these TaskProperties has an associated | 539 # If True, the TaskRequest embedding these TaskProperties has an associated |
| 532 # SecretBytes entity. | 540 # SecretBytes entity. |
| 533 has_secret_bytes = ndb.BooleanProperty(default=False, indexed=False) | 541 has_secret_bytes = ndb.BooleanProperty(default=False, indexed=False) |
| 534 | 542 |
| 535 @property | 543 @property |
| 544 def dimensions(self): |
| 545 """Returns a dict(key, value).""" |
| 546 if self.dimensions_dict: |
| 547 return self.dimensions_dict |
| 548 return dict(i.split(u':', 1) for i in self.dimensions_flat) |
| 549 |
| 550 @property |
| 536 def is_terminate(self): | 551 def is_terminate(self): |
| 537 """If True, it is a terminate request.""" | 552 """If True, it is a terminate request.""" |
| 538 return ( | 553 return ( |
| 539 not self.caches and | 554 not self.caches and |
| 540 not self.command and | 555 not self.command and |
| 541 self.dimensions.keys() == [u'id'] and | 556 self.dimensions.keys() == [u'id'] and |
| 542 not (self.inputs_ref and self.inputs_ref.isolated) and | 557 not (self.inputs_ref and self.inputs_ref.isolated) and |
| 543 not self.cipd_input and | 558 not self.cipd_input and |
| 544 not self.env and | 559 not self.env and |
| 545 not self.execution_timeout_secs and | 560 not self.execution_timeout_secs and |
| 546 not self.extra_args and | 561 not self.extra_args and |
| 547 not self.grace_period_secs and | 562 not self.grace_period_secs and |
| 548 not self.io_timeout_secs and | 563 not self.io_timeout_secs and |
| 549 not self.idempotent and | 564 not self.idempotent and |
| 550 not self.outputs and | 565 not self.outputs and |
| 551 not self.has_secret_bytes) | 566 not self.has_secret_bytes) |
| 552 | 567 |
| 568 def to_dict(self): |
| 569 out = super(TaskProperties, self).to_dict( |
| 570 exclude=['dimensions_dict', 'dimensions_flat']) |
| 571 out['dimensions'] = self.dimensions |
| 572 return out |
| 573 |
| 553 def _pre_put_hook(self): | 574 def _pre_put_hook(self): |
| 554 super(TaskProperties, self)._pre_put_hook() | 575 super(TaskProperties, self)._pre_put_hook() |
| 555 if self.is_terminate: | 576 if self.is_terminate: |
| 556 # Most values are not valid with a terminate task. self.is_terminate | 577 # Most values are not valid with a terminate task. self.is_terminate |
| 557 # already check those. | 578 # already check those. |
| 558 return | 579 return |
| 559 | 580 |
| 581 # Dimensions. |
| 582 if not self.dimensions: |
| 583 raise datastore_errors.BadValueError(u'dimensions are required') |
| 584 keys = self.dimensions.keys() |
| 585 count_pool = sum(1 for k in keys if k == u'pool') |
| 586 count_id = sum(1 for k in keys if k == u'id') |
| 587 if not count_pool and not count_id: |
| 588 raise datastore_errors.BadValueError( |
| 589 u'At least one of \'id\' or \'pool\' must be used as dimensions') |
| 590 if count_id > 1: |
| 591 raise datastore_errors.BadValueError( |
| 592 u'\'id\' cannot be specified more than once in dimensions') |
| 593 if len(keys) > 64: |
| 594 raise datastore_errors.BadValueError( |
| 595 'dimensions can have up to 64 entries') |
| 596 if not _is_sorted(self.dimensions_flat): |
| 597 # Better check here than silently sort since operations (like calculating |
| 598 # dimensions_hash) is done before storage, which would cause silent |
| 599 # corruption. |
| 600 raise datastore_errors.BadValueError( |
| 601 u'internal error: dimensions_flat must be sorted') |
| 602 |
| 603 # Isolated input and commands. |
| 560 isolated_input = self.inputs_ref and self.inputs_ref.isolated | 604 isolated_input = self.inputs_ref and self.inputs_ref.isolated |
| 561 if not self.command and not isolated_input: | 605 if not self.command and not isolated_input: |
| 562 raise datastore_errors.BadValueError( | 606 raise datastore_errors.BadValueError( |
| 563 'use at least one of command or inputs_ref.isolated') | 607 'use at least one of command or inputs_ref.isolated') |
| 564 if self.command and self.extra_args: | 608 if self.command and self.extra_args: |
| 565 raise datastore_errors.BadValueError( | 609 raise datastore_errors.BadValueError( |
| 566 'can\'t use both command and extra_args') | 610 'can\'t use both command and extra_args') |
| 567 if self.extra_args and not isolated_input: | 611 if self.extra_args and not isolated_input: |
| 568 raise datastore_errors.BadValueError( | 612 raise datastore_errors.BadValueError( |
| 569 'extra_args require inputs_ref.isolated') | 613 'extra_args require inputs_ref.isolated') |
| (...skipping 189 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 759 raise datastore_errors.BadValueError( | 803 raise datastore_errors.BadValueError( |
| 760 'bad pubsub topic name - %s' % self.pubsub_topic) | 804 'bad pubsub topic name - %s' % self.pubsub_topic) |
| 761 if self.pubsub_auth_token and not self.pubsub_topic: | 805 if self.pubsub_auth_token and not self.pubsub_topic: |
| 762 raise datastore_errors.BadValueError( | 806 raise datastore_errors.BadValueError( |
| 763 'pubsub_auth_token requires pubsub_topic') | 807 'pubsub_auth_token requires pubsub_topic') |
| 764 if self.pubsub_userdata and not self.pubsub_topic: | 808 if self.pubsub_userdata and not self.pubsub_topic: |
| 765 raise datastore_errors.BadValueError( | 809 raise datastore_errors.BadValueError( |
| 766 'pubsub_userdata requires pubsub_topic') | 810 'pubsub_userdata requires pubsub_topic') |
| 767 | 811 |
| 768 | 812 |
| 813 ### Private stuff. |
| 814 |
| 815 |
| 816 def _is_sorted(l): |
| 817 return all(a <= b for a, b in itertools.izip(l[:-1], l[1:])) |
| 818 |
| 819 |
| 769 ### Public API. | 820 ### Public API. |
| 770 | 821 |
| 771 | 822 |
| 772 def create_termination_task(bot_id, allow_high_priority): | 823 def create_termination_task(bot_id, allow_high_priority): |
| 773 """Returns a task to terminate the given bot. | 824 """Returns a task to terminate the given bot. |
| 774 | 825 |
| 775 Returns: | 826 Returns: |
| 776 TaskRequest for priority 0 (highest) termination task. | 827 TaskRequest for priority 0 (highest) termination task. |
| 777 """ | 828 """ |
| 778 properties = TaskProperties( | 829 properties = TaskProperties( |
| 779 dimensions={u'id': unicode(bot_id)}, | 830 dimensions_dict={u'id': unicode(bot_id)}, |
| 780 execution_timeout_secs=0, | 831 execution_timeout_secs=0, |
| 781 grace_period_secs=0, | 832 grace_period_secs=0, |
| 782 io_timeout_secs=0) | 833 io_timeout_secs=0) |
| 783 now = utils.utcnow() | 834 now = utils.utcnow() |
| 784 request = TaskRequest( | 835 request = TaskRequest( |
| 785 created_ts=now, | 836 created_ts=now, |
| 786 expiration_ts=now + datetime.timedelta(days=1), | 837 expiration_ts=now + datetime.timedelta(days=1), |
| 787 name=u'Terminate %s' % bot_id, | 838 name=u'Terminate %s' % bot_id, |
| 788 priority=0, | 839 priority=0, |
| 789 properties=properties, | 840 properties=properties, |
| (...skipping 187 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 977 New request uses same 'service_account_token', but 'init_new_request' will | 1028 New request uses same 'service_account_token', but 'init_new_request' will |
| 978 recheck it again, ensuring the current user (the one that triggers the retry) | 1029 recheck it again, ensuring the current user (the one that triggers the retry) |
| 979 is able to use the token. | 1030 is able to use the token. |
| 980 | 1031 |
| 981 Returns: | 1032 Returns: |
| 982 The newly created TaskRequest. | 1033 The newly created TaskRequest. |
| 983 """ | 1034 """ |
| 984 now = utils.utcnow() | 1035 now = utils.utcnow() |
| 985 if original_request.properties.is_terminate: | 1036 if original_request.properties.is_terminate: |
| 986 raise ValueError('cannot clone a terminate request') | 1037 raise ValueError('cannot clone a terminate request') |
| 987 properties = TaskProperties(**original_request.properties.to_dict()) | 1038 orig_props = original_request.properties.to_dict() |
| 1039 dimensions_dict = orig_props.pop(u'dimensions') |
| 1040 properties = TaskProperties(dimensions_dict=dimensions_dict, **orig_props) |
| 988 properties.idempotent = False | 1041 properties.idempotent = False |
| 989 expiration_ts = ( | 1042 expiration_ts = ( |
| 990 now + (original_request.expiration_ts - original_request.created_ts)) | 1043 now + (original_request.expiration_ts - original_request.created_ts)) |
| 991 name = original_request.name | 1044 name = original_request.name |
| 992 match = re.match(r'^(.*) \(Retry #(\d+)\)$', name) | 1045 match = re.match(r'^(.*) \(Retry #(\d+)\)$', name) |
| 993 if match: | 1046 if match: |
| 994 name = '%s (Retry #%d)' % (match.group(1), int(match.group(2)) + 1) | 1047 name = '%s (Retry #%d)' % (match.group(1), int(match.group(2)) + 1) |
| 995 else: | 1048 else: |
| 996 name += ' (Retry #1)' | 1049 name += ' (Retry #1)' |
| 997 user = auth.get_current_identity() | 1050 user = auth.get_current_identity() |
| (...skipping 16 matching lines...) Expand all Loading... |
| 1014 init_new_request(request, allow_high_priority, original_secret_bytes) | 1067 init_new_request(request, allow_high_priority, original_secret_bytes) |
| 1015 return request | 1068 return request |
| 1016 | 1069 |
| 1017 | 1070 |
| 1018 def validate_priority(priority): | 1071 def validate_priority(priority): |
| 1019 """Throws ValueError if priority is not a valid value.""" | 1072 """Throws ValueError if priority is not a valid value.""" |
| 1020 if 0 > priority or MAXIMUM_PRIORITY < priority: | 1073 if 0 > priority or MAXIMUM_PRIORITY < priority: |
| 1021 raise datastore_errors.BadValueError( | 1074 raise datastore_errors.BadValueError( |
| 1022 'priority (%d) must be between 0 and %d (inclusive)' % | 1075 'priority (%d) must be between 0 and %d (inclusive)' % |
| 1023 (priority, MAXIMUM_PRIORITY)) | 1076 (priority, MAXIMUM_PRIORITY)) |
| OLD | NEW |