| OLD | NEW |
| 1 # coding: utf-8 | 1 # coding: utf-8 |
| 2 # Copyright 2014 The LUCI Authors. All rights reserved. | 2 # Copyright 2014 The LUCI Authors. All rights reserved. |
| 3 # Use of this source code is governed under the Apache License, Version 2.0 | 3 # Use of this source code is governed under the Apache License, Version 2.0 |
| 4 # that can be found in the LICENSE file. | 4 # that can be found in the LICENSE file. |
| 5 | 5 |
| 6 """Tasks definition. | 6 """Tasks definition. |
| 7 | 7 |
| 8 Each user request creates a new TaskRequest. The TaskRequest instance saves the | 8 Each user request creates a new TaskRequest. The TaskRequest instance saves the |
| 9 metadata of the request, e.g. who requested it, when why, etc. It links to the | 9 metadata of the request, e.g. who requested it, when why, etc. It links to the |
| 10 actual data of the request in a TaskProperties. The TaskProperties represents | 10 actual data of the request in a TaskProperties. The TaskProperties represents |
| (...skipping 524 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 535 # will be added to those in that directory. | 535 # will be added to those in that directory. |
| 536 outputs = ndb.StringProperty(repeated=True, indexed=False, | 536 outputs = ndb.StringProperty(repeated=True, indexed=False, |
| 537 validator=_validate_output_path) | 537 validator=_validate_output_path) |
| 538 | 538 |
| 539 # If True, the TaskRequest embedding these TaskProperties has an associated | 539 # If True, the TaskRequest embedding these TaskProperties has an associated |
| 540 # SecretBytes entity. | 540 # SecretBytes entity. |
| 541 has_secret_bytes = ndb.BooleanProperty(default=False, indexed=False) | 541 has_secret_bytes = ndb.BooleanProperty(default=False, indexed=False) |
| 542 | 542 |
| 543 @property | 543 @property |
| 544 def dimensions(self): | 544 def dimensions(self): |
| 545 """Returns a dict(key, value).""" | 545 """Returns a list of tuple(key, value).""" |
| 546 if self.dimensions_dict: | 546 if self.dimensions_dict: |
| 547 return self.dimensions_dict | 547 return sorted(self.dimensions_dict.items()) |
| 548 return dict(i.split(u':', 1) for i in self.dimensions_flat) | 548 # Data is already sorted. |
| 549 return [tuple(i.split(u':', 1)) for i in self.dimensions_flat] |
| 549 | 550 |
| 550 @property | 551 @property |
| 551 def is_terminate(self): | 552 def is_terminate(self): |
| 552 """If True, it is a terminate request.""" | 553 """If True, it is a terminate request.""" |
| 553 return ( | 554 return ( |
| 554 not self.caches and | 555 not self.caches and |
| 555 not self.command and | 556 not self.command and |
| 556 self.dimensions.keys() == [u'id'] and | 557 [i[0] for i in self.dimensions] == [u'id'] and |
| 557 not (self.inputs_ref and self.inputs_ref.isolated) and | 558 not (self.inputs_ref and self.inputs_ref.isolated) and |
| 558 not self.cipd_input and | 559 not self.cipd_input and |
| 559 not self.env and | 560 not self.env and |
| 560 not self.execution_timeout_secs and | 561 not self.execution_timeout_secs and |
| 561 not self.extra_args and | 562 not self.extra_args and |
| 562 not self.grace_period_secs and | 563 not self.grace_period_secs and |
| 563 not self.io_timeout_secs and | 564 not self.io_timeout_secs and |
| 564 not self.idempotent and | 565 not self.idempotent and |
| 565 not self.outputs and | 566 not self.outputs and |
| 566 not self.has_secret_bytes) | 567 not self.has_secret_bytes) |
| 567 | 568 |
| 568 def to_dict(self): | 569 def to_dict(self): |
| 569 out = super(TaskProperties, self).to_dict( | 570 out = super(TaskProperties, self).to_dict( |
| 570 exclude=['dimensions_dict', 'dimensions_flat']) | 571 exclude=['dimensions_dict', 'dimensions_flat']) |
| 571 out['dimensions'] = self.dimensions | 572 out['dimensions'] = self.dimensions |
| 572 return out | 573 return out |
| 573 | 574 |
| 574 def _pre_put_hook(self): | 575 def _pre_put_hook(self): |
| 575 super(TaskProperties, self)._pre_put_hook() | 576 super(TaskProperties, self)._pre_put_hook() |
| 576 if self.is_terminate: | 577 if self.is_terminate: |
| 577 # Most values are not valid with a terminate task. self.is_terminate | 578 # Most values are not valid with a terminate task. self.is_terminate |
| 578 # already check those. | 579 # already check those. |
| 579 return | 580 return |
| 580 | 581 |
| 581 # Dimensions. | 582 # Dimensions. |
| 582 if not self.dimensions: | 583 if self.dimensions_dict: |
| 584 raise datastore_errors.BadValueError( |
| 585 u'internal error: dimensions_dict cannot be stored anymore') |
| 586 if not self.dimensions_flat: |
| 583 raise datastore_errors.BadValueError(u'dimensions are required') | 587 raise datastore_errors.BadValueError(u'dimensions are required') |
| 584 keys = self.dimensions.keys() | 588 keys = [i.split(u':', 1)[0] for i in self.dimensions_flat] |
| 585 count_pool = sum(1 for k in keys if k == u'pool') | 589 count_pool = sum(1 for k in keys if k == u'pool') |
| 586 count_id = sum(1 for k in keys if k == u'id') | 590 count_id = sum(1 for k in keys if k == u'id') |
| 587 if not count_pool and not count_id: | 591 if not count_pool and not count_id: |
| 588 raise datastore_errors.BadValueError( | 592 raise datastore_errors.BadValueError( |
| 589 u'At least one of \'id\' or \'pool\' must be used as dimensions') | 593 u'At least one of \'id\' or \'pool\' must be used as dimensions') |
| 590 if count_id > 1: | 594 if count_id > 1: |
| 591 raise datastore_errors.BadValueError( | 595 raise datastore_errors.BadValueError( |
| 592 u'\'id\' cannot be specified more than once in dimensions') | 596 u'\'id\' cannot be specified more than once in dimensions') |
| 593 if len(keys) > 64: | 597 if len(self.dimensions_flat) > 64: |
| 594 raise datastore_errors.BadValueError( | 598 raise datastore_errors.BadValueError( |
| 595 'dimensions can have up to 64 entries') | 599 'dimensions can have up to 64 entries') |
| 596 if not _is_sorted(self.dimensions_flat): | 600 if not _is_sorted(self.dimensions_flat): |
| 597 # Better check here than silently sort since operations (like calculating | 601 # Better check here than silently sort since operations (like calculating |
| 598 # dimensions_hash) is done before storage, which would cause silent | 602 # dimensions_hash) is done before storage, which would cause silent |
| 599 # corruption. | 603 # corruption. |
| 600 raise datastore_errors.BadValueError( | 604 raise datastore_errors.BadValueError( |
| 601 u'internal error: dimensions_flat must be sorted') | 605 u'internal error: dimensions_flat must be sorted') |
| 602 | 606 |
| 603 # Isolated input and commands. | 607 # Isolated input and commands. |
| (...skipping 216 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 820 ### Public API. | 824 ### Public API. |
| 821 | 825 |
| 822 | 826 |
| 823 def create_termination_task(bot_id, allow_high_priority): | 827 def create_termination_task(bot_id, allow_high_priority): |
| 824 """Returns a task to terminate the given bot. | 828 """Returns a task to terminate the given bot. |
| 825 | 829 |
| 826 Returns: | 830 Returns: |
| 827 TaskRequest for priority 0 (highest) termination task. | 831 TaskRequest for priority 0 (highest) termination task. |
| 828 """ | 832 """ |
| 829 properties = TaskProperties( | 833 properties = TaskProperties( |
| 830 dimensions_dict={u'id': unicode(bot_id)}, | 834 dimensions_flat=[u'id:%s' % unicode(bot_id)], |
| 831 execution_timeout_secs=0, | 835 execution_timeout_secs=0, |
| 832 grace_period_secs=0, | 836 grace_period_secs=0, |
| 833 io_timeout_secs=0) | 837 io_timeout_secs=0) |
| 834 now = utils.utcnow() | 838 now = utils.utcnow() |
| 835 request = TaskRequest( | 839 request = TaskRequest( |
| 836 created_ts=now, | 840 created_ts=now, |
| 837 expiration_ts=now + datetime.timedelta(days=1), | 841 expiration_ts=now + datetime.timedelta(days=1), |
| 838 name=u'Terminate %s' % bot_id, | 842 name=u'Terminate %s' % bot_id, |
| 839 priority=0, | 843 priority=0, |
| 840 properties=properties, | 844 properties=properties, |
| (...skipping 146 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 987 if request.service_account_token == 'bot': | 991 if request.service_account_token == 'bot': |
| 988 request.service_account = 'bot' | 992 request.service_account = 'bot' |
| 989 else: | 993 else: |
| 990 # TODO(vadimsh): Check the token signature, verify it can be used by the | 994 # TODO(vadimsh): Check the token signature, verify it can be used by the |
| 991 # current user, extract service account email. | 995 # current user, extract service account email. |
| 992 raise auth.AuthorizationError('service_account_token is not implemented') | 996 raise auth.AuthorizationError('service_account_token is not implemented') |
| 993 | 997 |
| 994 request.tags.append('priority:%s' % request.priority) | 998 request.tags.append('priority:%s' % request.priority) |
| 995 request.tags.append('user:%s' % request.user) | 999 request.tags.append('user:%s' % request.user) |
| 996 request.tags.append('service_account:%s' % request.service_account) | 1000 request.tags.append('service_account:%s' % request.service_account) |
| 997 for key, value in request.properties.dimensions.iteritems(): | 1001 for key, value in request.properties.dimensions: |
| 998 request.tags.append('%s:%s' % (key, value)) | 1002 request.tags.append('%s:%s' % (key, value)) |
| 999 request.tags = sorted(set(request.tags)) | 1003 request.tags = sorted(set(request.tags)) |
| 1000 | 1004 |
| 1001 if request.properties.idempotent: | 1005 if request.properties.idempotent: |
| 1002 props = request.properties.to_dict() | 1006 props = request.properties.to_dict() |
| 1003 if secret_bytes_ent is not None: | 1007 if secret_bytes_ent is not None: |
| 1004 props['secret_bytes'] = secret_bytes_ent.secret_bytes.encode('hex') | 1008 props['secret_bytes'] = secret_bytes_ent.secret_bytes.encode('hex') |
| 1005 request.properties_hash = request.HASHING_ALGO( | 1009 request.properties_hash = request.HASHING_ALGO( |
| 1006 utils.encode_to_json(props)).digest() | 1010 utils.encode_to_json(props)).digest() |
| 1007 else: | 1011 else: |
| (...skipping 21 matching lines...) Expand all Loading... |
| 1029 recheck it again, ensuring the current user (the one that triggers the retry) | 1033 recheck it again, ensuring the current user (the one that triggers the retry) |
| 1030 is able to use the token. | 1034 is able to use the token. |
| 1031 | 1035 |
| 1032 Returns: | 1036 Returns: |
| 1033 The newly created TaskRequest. | 1037 The newly created TaskRequest. |
| 1034 """ | 1038 """ |
| 1035 now = utils.utcnow() | 1039 now = utils.utcnow() |
| 1036 if original_request.properties.is_terminate: | 1040 if original_request.properties.is_terminate: |
| 1037 raise ValueError('cannot clone a terminate request') | 1041 raise ValueError('cannot clone a terminate request') |
| 1038 orig_props = original_request.properties.to_dict() | 1042 orig_props = original_request.properties.to_dict() |
| 1039 dimensions_dict = orig_props.pop(u'dimensions') | 1043 dimensions_flat = [ |
| 1040 properties = TaskProperties(dimensions_dict=dimensions_dict, **orig_props) | 1044 u'%s:%s' % (k, v) for k, v in orig_props.pop(u'dimensions') |
| 1045 ] |
| 1046 properties = TaskProperties(dimensions_flat=dimensions_flat, **orig_props) |
| 1041 properties.idempotent = False | 1047 properties.idempotent = False |
| 1042 expiration_ts = ( | 1048 expiration_ts = ( |
| 1043 now + (original_request.expiration_ts - original_request.created_ts)) | 1049 now + (original_request.expiration_ts - original_request.created_ts)) |
| 1044 name = original_request.name | 1050 name = original_request.name |
| 1045 match = re.match(r'^(.*) \(Retry #(\d+)\)$', name) | 1051 match = re.match(r'^(.*) \(Retry #(\d+)\)$', name) |
| 1046 if match: | 1052 if match: |
| 1047 name = '%s (Retry #%d)' % (match.group(1), int(match.group(2)) + 1) | 1053 name = '%s (Retry #%d)' % (match.group(1), int(match.group(2)) + 1) |
| 1048 else: | 1054 else: |
| 1049 name += ' (Retry #1)' | 1055 name += ' (Retry #1)' |
| 1050 user = auth.get_current_identity() | 1056 user = auth.get_current_identity() |
| (...skipping 16 matching lines...) Expand all Loading... |
| 1067 init_new_request(request, allow_high_priority, original_secret_bytes) | 1073 init_new_request(request, allow_high_priority, original_secret_bytes) |
| 1068 return request | 1074 return request |
| 1069 | 1075 |
| 1070 | 1076 |
| 1071 def validate_priority(priority): | 1077 def validate_priority(priority): |
| 1072 """Throws ValueError if priority is not a valid value.""" | 1078 """Throws ValueError if priority is not a valid value.""" |
| 1073 if 0 > priority or MAXIMUM_PRIORITY < priority: | 1079 if 0 > priority or MAXIMUM_PRIORITY < priority: |
| 1074 raise datastore_errors.BadValueError( | 1080 raise datastore_errors.BadValueError( |
| 1075 'priority (%d) must be between 0 and %d (inclusive)' % | 1081 'priority (%d) must be between 0 and %d (inclusive)' % |
| 1076 (priority, MAXIMUM_PRIORITY)) | 1082 (priority, MAXIMUM_PRIORITY)) |
| OLD | NEW |