Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(91)

Side by Side Diff: appengine/swarming/server/task_request.py

Issue 2928823002: Make backward-forward compatible version. (Closed)
Patch Set: Created 3 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 # coding: utf-8 1 # coding: utf-8
2 # Copyright 2014 The LUCI Authors. All rights reserved. 2 # Copyright 2014 The LUCI Authors. All rights reserved.
3 # Use of this source code is governed under the Apache License, Version 2.0 3 # Use of this source code is governed under the Apache License, Version 2.0
4 # that can be found in the LICENSE file. 4 # that can be found in the LICENSE file.
5 5
6 """Tasks definition. 6 """Tasks definition.
7 7
8 Each user request creates a new TaskRequest. The TaskRequest instance saves the 8 Each user request creates a new TaskRequest. The TaskRequest instance saves the
9 metadata of the request, e.g. who requested it, when why, etc. It links to the 9 metadata of the request, e.g. who requested it, when why, etc. It links to the
10 actual data of the request in a TaskProperties. The TaskProperties represents 10 actual data of the request in a TaskProperties. The TaskProperties represents
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
44 | 44 |
45 v 45 v
46 <See task_to_run.py and task_result.py> 46 <See task_to_run.py and task_result.py>
47 47
48 TaskProperties is embedded in TaskRequest. TaskProperties is still declared as a 48 TaskProperties is embedded in TaskRequest. TaskProperties is still declared as a
49 separate entity to clearly declare the boundary for task request deduplication. 49 separate entity to clearly declare the boundary for task request deduplication.
50 """ 50 """
51 51
52 import datetime 52 import datetime
53 import hashlib 53 import hashlib
54 import itertools
54 import posixpath 55 import posixpath
55 import random 56 import random
56 import re 57 import re
57 import urlparse 58 import urlparse
58 59
59 from google.appengine.api import datastore_errors 60 from google.appengine.api import datastore_errors
60 from google.appengine.ext import ndb 61 from google.appengine.ext import ndb
61 62
62 from components import auth 63 from components import auth
63 from components import datastore_utils 64 from components import datastore_utils
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after
143 """Validates TaskProperties.env.""" 144 """Validates TaskProperties.env."""
144 # pylint: disable=W0212 145 # pylint: disable=W0212
145 if not all( 146 if not all(
146 isinstance(k, unicode) and isinstance(v, unicode) 147 isinstance(k, unicode) and isinstance(v, unicode)
147 for k, v in value.iteritems()): 148 for k, v in value.iteritems()):
148 # pylint: disable=W0212 149 # pylint: disable=W0212
149 raise TypeError( 150 raise TypeError(
150 '%s must be a dict of strings, not %r' % (prop._name, value)) 151 '%s must be a dict of strings, not %r' % (prop._name, value))
151 152
152 153
153 def _validate_dimensions(prop, value): 154 def _validate_dimension_flat(_prop, value):
154 """Validates TaskProperties.dimensions.""" 155 """Validates TaskProperties.dimensions_flat."""
155 # pylint: disable=W0212
156 if not value: 156 if not value:
157 raise datastore_errors.BadValueError(u'%s must be specified' % prop._name) 157 raise datastore_errors.BadValueError(u'dimensions must be specified')
158 key, val = value.split(u':', 1)
159 if not config.validate_dimension_key(key):
160 raise datastore_errors.BadValueError(u'dimension %r isn\'t valid' % key)
161 if not config.validate_dimension_value(val):
162 raise datastore_errors.BadValueError(
163 u'dimension %r:%r isn\'t valid' % (key, val))
164
165
166 def _validate_dimensions_dict(prop, value):
167 """Validates old TaskProperties.dimensions_dict."""
158 _validate_dict_of_strings(prop, value) 168 _validate_dict_of_strings(prop, value)
159 for key, val in value.iteritems(): 169 for key, val in value.iteritems():
160 if not config.validate_dimension_key(key): 170 if not config.validate_dimension_key(key):
161 raise datastore_errors.BadValueError(u'dimension %r isn\'t valid' % key) 171 raise datastore_errors.BadValueError(u'dimension %r isn\'t valid' % key)
162 if not config.validate_dimension_value(val): 172 if not config.validate_dimension_value(val):
163 raise datastore_errors.BadValueError( 173 raise datastore_errors.BadValueError(
164 u'dimension %r:%r isn\'t valid' % (key, val)) 174 u'dimension %r:%r isn\'t valid' % (key, val))
165 if u'pool' not in value and u'id' not in value:
166 raise datastore_errors.BadValueError(
167 u'At least one of \'id\' or \'pool\' must be used as %s' % prop._name)
168 if len(value) > 64:
169 raise datastore_errors.BadValueError(
170 '%s can have up to 64 keys' % prop._name)
171 175
172 176
173 def _validate_env(prop, value): 177 def _validate_env(prop, value):
174 _validate_dict_of_strings(prop, value) 178 _validate_dict_of_strings(prop, value)
175 maxlen = 1024 179 maxlen = 1024
176 for k, v in value.iteritems(): 180 for k, v in value.iteritems():
177 if not k: 181 if not k:
178 raise datastore_errors.BadValueError( 182 raise datastore_errors.BadValueError(
179 'valid key are required in %s' % prop._name) 183 'valid key are required in %s' % prop._name)
180 if len(k) > maxlen: 184 if len(k) > maxlen:
(...skipping 300 matching lines...) Expand 10 before | Expand all | Expand 10 after
481 # output too. See TODO at the top of this class. 485 # output too. See TODO at the top of this class.
482 # May be non-None even if task input is not isolated. 486 # May be non-None even if task input is not isolated.
483 # 487 #
484 # Only inputs_ref.isolated or command can be specified. 488 # Only inputs_ref.isolated or command can be specified.
485 inputs_ref = ndb.LocalStructuredProperty(FilesRef) 489 inputs_ref = ndb.LocalStructuredProperty(FilesRef)
486 490
487 # CIPD packages to install. 491 # CIPD packages to install.
488 cipd_input = ndb.LocalStructuredProperty(CipdInput) 492 cipd_input = ndb.LocalStructuredProperty(CipdInput)
489 493
490 # Filter to use to determine the required properties on the bot to run on. For 494 # Filter to use to determine the required properties on the bot to run on. For
491 # example, Windows or hostname. Encoded as json. Either 'pool' or 'id' 495 # example, Windows or hostname. Either 'pool' or 'id' dimension is required.
492 # dimension are required (see _validate_dimensions). 496 dimensions_flat = ndb.StringProperty(
493 dimensions = datastore_utils.DeterministicJsonProperty( 497 validator=_validate_dimension_flat, indexed=False, repeated=True)
494 validator=_validate_dimensions, json_type=dict, indexed=False) 498 # Old way to express dimensions. Not used anymore as this doesn't support
499 # repeated keys.
500 dimensions_dict = datastore_utils.DeterministicJsonProperty(
501 validator=_validate_dimensions_dict, json_type=dict, indexed=False,
502 name='dimensions')
495 503
496 # Environment variables. Encoded as json. Optional. 504 # Environment variables. Encoded as json. Optional.
497 env = datastore_utils.DeterministicJsonProperty( 505 env = datastore_utils.DeterministicJsonProperty(
498 validator=_validate_env, json_type=dict, indexed=False) 506 validator=_validate_env, json_type=dict, indexed=False)
499 507
500 # Maximum duration the bot can take to run this task. It's named hard_timeout 508 # Maximum duration the bot can take to run this task. It's named hard_timeout
501 # in the bot. 509 # in the bot.
502 execution_timeout_secs = ndb.IntegerProperty( 510 execution_timeout_secs = ndb.IntegerProperty(
503 validator=_validate_timeout, required=True, indexed=False) 511 validator=_validate_timeout, required=True, indexed=False)
504 512
(...skipping 21 matching lines...) Expand all
526 # $(ISOLATED_OUTDIR) will be returned; otherwise, the files in this list 534 # $(ISOLATED_OUTDIR) will be returned; otherwise, the files in this list
527 # will be added to those in that directory. 535 # will be added to those in that directory.
528 outputs = ndb.StringProperty(repeated=True, indexed=False, 536 outputs = ndb.StringProperty(repeated=True, indexed=False,
529 validator=_validate_output_path) 537 validator=_validate_output_path)
530 538
531 # If True, the TaskRequest embedding these TaskProperties has an associated 539 # If True, the TaskRequest embedding these TaskProperties has an associated
532 # SecretBytes entity. 540 # SecretBytes entity.
533 has_secret_bytes = ndb.BooleanProperty(default=False, indexed=False) 541 has_secret_bytes = ndb.BooleanProperty(default=False, indexed=False)
534 542
535 @property 543 @property
544 def dimensions(self):
545 """Returns a dict(key, value)."""
546 if self.dimensions_dict:
547 return self.dimensions_dict
548 return dict(i.split(u':', 1) for i in self.dimensions_flat)
549
550 @property
536 def is_terminate(self): 551 def is_terminate(self):
537 """If True, it is a terminate request.""" 552 """If True, it is a terminate request."""
538 return ( 553 return (
539 not self.caches and 554 not self.caches and
540 not self.command and 555 not self.command and
541 self.dimensions.keys() == [u'id'] and 556 self.dimensions.keys() == [u'id'] and
542 not (self.inputs_ref and self.inputs_ref.isolated) and 557 not (self.inputs_ref and self.inputs_ref.isolated) and
543 not self.cipd_input and 558 not self.cipd_input and
544 not self.env and 559 not self.env and
545 not self.execution_timeout_secs and 560 not self.execution_timeout_secs and
546 not self.extra_args and 561 not self.extra_args and
547 not self.grace_period_secs and 562 not self.grace_period_secs and
548 not self.io_timeout_secs and 563 not self.io_timeout_secs and
549 not self.idempotent and 564 not self.idempotent and
550 not self.outputs and 565 not self.outputs and
551 not self.has_secret_bytes) 566 not self.has_secret_bytes)
552 567
568 def to_dict(self):
569 out = super(TaskProperties, self).to_dict(
570 exclude=['dimensions_dict', 'dimensions_flat'])
571 out['dimensions'] = self.dimensions
572 return out
573
553 def _pre_put_hook(self): 574 def _pre_put_hook(self):
554 super(TaskProperties, self)._pre_put_hook() 575 super(TaskProperties, self)._pre_put_hook()
555 if self.is_terminate: 576 if self.is_terminate:
556 # Most values are not valid with a terminate task. self.is_terminate 577 # Most values are not valid with a terminate task. self.is_terminate
557 # already check those. 578 # already check those.
558 return 579 return
559 580
581 # Dimensions.
582 if not self.dimensions:
583 raise datastore_errors.BadValueError(u'dimensions are required')
584 keys = self.dimensions.keys()
585 count_pool = sum(1 for k in keys if k == u'pool')
586 count_id = sum(1 for k in keys if k == u'id')
587 if not count_pool and not count_id:
588 raise datastore_errors.BadValueError(
589 u'At least one of \'id\' or \'pool\' must be used as dimensions')
590 if count_id > 1:
591 raise datastore_errors.BadValueError(
592 u'\'id\' cannot be specified more than once in dimensions')
593 if len(keys) > 64:
594 raise datastore_errors.BadValueError(
595 'dimensions can have up to 64 entries')
596 if not _is_sorted(self.dimensions_flat):
597 # Better check here than silently sort since operations (like calculating
598 # dimensions_hash) is done before storage, which would cause silent
599 # corruption.
600 raise datastore_errors.BadValueError(
601 u'internal error: dimensions_flat must be sorted')
602
603 # Isolated input and commands.
560 isolated_input = self.inputs_ref and self.inputs_ref.isolated 604 isolated_input = self.inputs_ref and self.inputs_ref.isolated
561 if not self.command and not isolated_input: 605 if not self.command and not isolated_input:
562 raise datastore_errors.BadValueError( 606 raise datastore_errors.BadValueError(
563 'use at least one of command or inputs_ref.isolated') 607 'use at least one of command or inputs_ref.isolated')
564 if self.command and self.extra_args: 608 if self.command and self.extra_args:
565 raise datastore_errors.BadValueError( 609 raise datastore_errors.BadValueError(
566 'can\'t use both command and extra_args') 610 'can\'t use both command and extra_args')
567 if self.extra_args and not isolated_input: 611 if self.extra_args and not isolated_input:
568 raise datastore_errors.BadValueError( 612 raise datastore_errors.BadValueError(
569 'extra_args require inputs_ref.isolated') 613 'extra_args require inputs_ref.isolated')
(...skipping 189 matching lines...) Expand 10 before | Expand all | Expand 10 after
759 raise datastore_errors.BadValueError( 803 raise datastore_errors.BadValueError(
760 'bad pubsub topic name - %s' % self.pubsub_topic) 804 'bad pubsub topic name - %s' % self.pubsub_topic)
761 if self.pubsub_auth_token and not self.pubsub_topic: 805 if self.pubsub_auth_token and not self.pubsub_topic:
762 raise datastore_errors.BadValueError( 806 raise datastore_errors.BadValueError(
763 'pubsub_auth_token requires pubsub_topic') 807 'pubsub_auth_token requires pubsub_topic')
764 if self.pubsub_userdata and not self.pubsub_topic: 808 if self.pubsub_userdata and not self.pubsub_topic:
765 raise datastore_errors.BadValueError( 809 raise datastore_errors.BadValueError(
766 'pubsub_userdata requires pubsub_topic') 810 'pubsub_userdata requires pubsub_topic')
767 811
768 812
813 ### Private stuff.
814
815
816 def _is_sorted(l):
817 return all(a <= b for a, b in itertools.izip(l[:-1], l[1:]))
818
819
769 ### Public API. 820 ### Public API.
770 821
771 822
772 def create_termination_task(bot_id, allow_high_priority): 823 def create_termination_task(bot_id, allow_high_priority):
773 """Returns a task to terminate the given bot. 824 """Returns a task to terminate the given bot.
774 825
775 Returns: 826 Returns:
776 TaskRequest for priority 0 (highest) termination task. 827 TaskRequest for priority 0 (highest) termination task.
777 """ 828 """
778 properties = TaskProperties( 829 properties = TaskProperties(
779 dimensions={u'id': unicode(bot_id)}, 830 dimensions_dict={u'id': unicode(bot_id)},
780 execution_timeout_secs=0, 831 execution_timeout_secs=0,
781 grace_period_secs=0, 832 grace_period_secs=0,
782 io_timeout_secs=0) 833 io_timeout_secs=0)
783 now = utils.utcnow() 834 now = utils.utcnow()
784 request = TaskRequest( 835 request = TaskRequest(
785 created_ts=now, 836 created_ts=now,
786 expiration_ts=now + datetime.timedelta(days=1), 837 expiration_ts=now + datetime.timedelta(days=1),
787 name=u'Terminate %s' % bot_id, 838 name=u'Terminate %s' % bot_id,
788 priority=0, 839 priority=0,
789 properties=properties, 840 properties=properties,
(...skipping 187 matching lines...) Expand 10 before | Expand all | Expand 10 after
977 New request uses same 'service_account_token', but 'init_new_request' will 1028 New request uses same 'service_account_token', but 'init_new_request' will
978 recheck it again, ensuring the current user (the one that triggers the retry) 1029 recheck it again, ensuring the current user (the one that triggers the retry)
979 is able to use the token. 1030 is able to use the token.
980 1031
981 Returns: 1032 Returns:
982 The newly created TaskRequest. 1033 The newly created TaskRequest.
983 """ 1034 """
984 now = utils.utcnow() 1035 now = utils.utcnow()
985 if original_request.properties.is_terminate: 1036 if original_request.properties.is_terminate:
986 raise ValueError('cannot clone a terminate request') 1037 raise ValueError('cannot clone a terminate request')
987 properties = TaskProperties(**original_request.properties.to_dict()) 1038 orig_props = original_request.properties.to_dict()
1039 dimensions_dict = orig_props.pop(u'dimensions')
1040 properties = TaskProperties(dimensions_dict=dimensions_dict, **orig_props)
988 properties.idempotent = False 1041 properties.idempotent = False
989 expiration_ts = ( 1042 expiration_ts = (
990 now + (original_request.expiration_ts - original_request.created_ts)) 1043 now + (original_request.expiration_ts - original_request.created_ts))
991 name = original_request.name 1044 name = original_request.name
992 match = re.match(r'^(.*) \(Retry #(\d+)\)$', name) 1045 match = re.match(r'^(.*) \(Retry #(\d+)\)$', name)
993 if match: 1046 if match:
994 name = '%s (Retry #%d)' % (match.group(1), int(match.group(2)) + 1) 1047 name = '%s (Retry #%d)' % (match.group(1), int(match.group(2)) + 1)
995 else: 1048 else:
996 name += ' (Retry #1)' 1049 name += ' (Retry #1)'
997 user = auth.get_current_identity() 1050 user = auth.get_current_identity()
(...skipping 16 matching lines...) Expand all
1014 init_new_request(request, allow_high_priority, original_secret_bytes) 1067 init_new_request(request, allow_high_priority, original_secret_bytes)
1015 return request 1068 return request
1016 1069
1017 1070
1018 def validate_priority(priority): 1071 def validate_priority(priority):
1019 """Throws ValueError if priority is not a valid value.""" 1072 """Throws ValueError if priority is not a valid value."""
1020 if 0 > priority or MAXIMUM_PRIORITY < priority: 1073 if 0 > priority or MAXIMUM_PRIORITY < priority:
1021 raise datastore_errors.BadValueError( 1074 raise datastore_errors.BadValueError(
1022 'priority (%d) must be between 0 and %d (inclusive)' % 1075 'priority (%d) must be between 0 and %d (inclusive)' %
1023 (priority, MAXIMUM_PRIORITY)) 1076 (priority, MAXIMUM_PRIORITY))
OLDNEW
« no previous file with comments | « appengine/swarming/server/task_queues_test.py ('k') | appengine/swarming/server/task_request_test.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698