| Index: appengine/components/components/config/remote.py
|
| diff --git a/appengine/components/components/config/remote.py b/appengine/components/components/config/remote.py
|
| index c776a2c2ab426c8ed11e8db37af2b80a1d3fbd13..467c598a6d9f38f3f026bbda33fb2329f34860b4 100644
|
| --- a/appengine/components/components/config/remote.py
|
| +++ b/appengine/components/components/config/remote.py
|
| @@ -9,6 +9,12 @@ import datetime
|
| import logging
|
| import urllib
|
|
|
| +# Config component is using google.protobuf package, it requires some python
|
| +# package magic hacking.
|
| +from components import utils
|
| +utils.fix_protobuf_package()
|
| +
|
| +from google import protobuf
|
| from google.appengine.ext import ndb
|
|
|
| from components import net
|
| @@ -34,7 +40,9 @@ class LastGoodConfig(ndb.Model):
|
| Root entity. Id is "<config_set>:<path>".
|
| """
|
| content = ndb.BlobProperty()
|
| + content_binary = ndb.BlobProperty()
|
| content_hash = ndb.StringProperty()
|
| + proto_message_name = ndb.StringProperty()
|
| revision = ndb.StringProperty()
|
| last_access_ts = ndb.DateTimeProperty()
|
|
|
| @@ -115,10 +123,12 @@ class Provider(object):
|
| if content_hash and use_memcache:
|
| yield ctx.memcache_set(
|
| cache_key, (revision, content_hash), time=60 if get_latest else 0)
|
| - raise ndb.Return((revision, content_hash))
|
| + raise ndb.Return(revision, content_hash)
|
|
|
| @ndb.tasklet
|
| - def get_async(self, config_set, path, revision=None, store_last_good=None):
|
| + def get_async(
|
| + self, config_set, path, revision=None, dest_type=None,
|
| + store_last_good=None):
|
| """Returns tuple (revision, content).
|
|
|
| If not found, returns (None, None).
|
| @@ -129,15 +139,16 @@ class Provider(object):
|
| assert path
|
|
|
| if store_last_good:
|
| - last_good = yield _get_last_good_async(config_set, path)
|
| - raise ndb.Return((last_good.revision, last_good.content))
|
| + result = yield _get_last_good_async(config_set, path, dest_type)
|
| + raise ndb.Return(result)
|
|
|
| revision, content_hash = yield self.get_config_hash_async(
|
| config_set, path, revision=revision)
|
| content = None
|
| if content_hash:
|
| content = yield self.get_config_by_hash_async(content_hash)
|
| - raise ndb.Return((revision, content))
|
| + config = common._convert_config(content, dest_type)
|
| + raise ndb.Return(revision, config)
|
|
|
| @ndb.tasklet
|
| def _get_configs_multi(self, url_path):
|
| @@ -223,7 +234,10 @@ class Provider(object):
|
| logging.warning(
|
| 'Could not fetch hash of latest %s', config_key.id())
|
| return
|
| - if current.revision == revision:
|
| +
|
| + binary_missing = (
|
| + current.proto_message_name and not current.content_binary)
|
| + if current.revision == revision and not binary_missing:
|
| assert current.content_hash == content_hash
|
| return
|
|
|
| @@ -243,17 +257,33 @@ class Provider(object):
|
| 'Invalid config %s:%s@%s is ignored', config_set, path, revision)
|
| return
|
|
|
| + # content may be None if we think that it matches what we have locally.
|
| +
|
| @ndb.transactional_tasklet
|
| def update():
|
| config = yield config_key.get_async()
|
| config.revision = revision
|
| if config.content_hash != content_hash:
|
| if content is None:
|
| + # Content hash matched before we started the transaction.
|
| # Config was updated between content_hash was resolved and
|
| - # the transaction has started. Do nothing.
|
| + # the transaction has started. Do nothing, next cron run will
|
| + # get a new hash.
|
| return
|
| config.content_hash = content_hash
|
| config.content = content
|
| + config.content_binary = None # Invalidate to refresh below.
|
| +
|
| + if config.proto_message_name and not config.content_binary:
|
| + try:
|
| + config.content_binary = _content_to_binary(
|
| + config.proto_message_name, config.content)
|
| + except common.ConfigFormatError:
|
| + logging.exception(
|
| + 'Invalid config %s:%s@%s is ignored', config_set, path,
|
| + revision)
|
| + return
|
| +
|
| yield config.put_async()
|
| logging.info(
|
| 'Updated last good config %s to %s',
|
| @@ -261,28 +291,84 @@ class Provider(object):
|
| yield update()
|
|
|
|
|
| +def _content_to_binary(proto_message_name, content):
|
| + try:
|
| + dest_type = protobuf.symbol_database.Default().GetSymbol(
|
| + proto_message_name)
|
| + except KeyError:
|
| + logging.exception(
|
| + 'Could not load message type %s. Skipping binary serialization',
|
| + proto_message_name)
|
| + return None
|
| + return common._convert_config(content, dest_type).SerializeToString()
|
| +
|
| +
|
| @ndb.tasklet
|
| -def _get_last_good_async(config_set, path):
|
| - """Fetches LastGoodConfig and updates last_access_ts if needed."""
|
| +def _get_last_good_async(config_set, path, dest_type):
|
| + """Returns last good (rev, config) and updates last_access_ts if needed."""
|
| now = utils.utcnow()
|
| - # By inserting an entity we tell a Cron job to fetch it from a config service.
|
| last_good_id = '%s:%s' % (config_set, path)
|
| - last_good = yield LastGoodConfig.get_or_insert_async(
|
| - last_good_id, last_access_ts=now)
|
|
|
| - # Update last_access_ts, but not on each call.
|
| - if (last_good.last_access_ts is None or
|
| - now - last_good.last_access_ts > UPDATE_LAST_ACCESS_TIME_FREQUENCY):
|
| + proto_message_name = None
|
| + if dest_type and issubclass(dest_type, protobuf.message.Message):
|
| + proto_message_name = dest_type.DESCRIPTOR.full_name
|
| + try:
|
| + protobuf.symbol_database.Default().GetSymbol(proto_message_name)
|
| + except KeyError: # pragma: no cover
|
| + logging.exception(
|
| + 'Recompile %s proto message with the latest protoc',
|
| + proto_message_name)
|
| + proto_message_name = None
|
| +
|
| + last_good = yield LastGoodConfig.get_by_id_async(last_good_id)
|
| +
|
| + # If entity does not exist, or its last_access_ts wasn't updated for a while
|
| + # or its proto_message_name is not up to date, then update the entity.
|
| + if (not last_good or
|
| + not last_good.last_access_ts or
|
| + now - last_good.last_access_ts > UPDATE_LAST_ACCESS_TIME_FREQUENCY or
|
| + last_good.proto_message_name != proto_message_name):
|
| # pylint does not like this usage of transactional_tasklet
|
| # pylint: disable=no-value-for-parameter
|
| @ndb.transactional_tasklet(propagation=ndb.TransactionOptions.INDEPENDENT)
|
| def update():
|
| - last_good = yield LastGoodConfig.get_or_insert_async(
|
| - last_good_id, last_access_ts=now)
|
| + last_good = yield LastGoodConfig.get_by_id_async(last_good_id)
|
| + last_good = last_good or LastGoodConfig(id=last_good_id)
|
| last_good.last_access_ts = now
|
| + if last_good.proto_message_name != proto_message_name:
|
| + last_good.content_binary = None
|
| + last_good.proto_message_name = proto_message_name
|
| yield last_good.put_async()
|
| yield update()
|
| - raise ndb.Return(last_good)
|
| +
|
| + if not last_good or not last_good.revision:
|
| + # The config wasn't loaded yet.
|
| + raise ndb.Return(None, None)
|
| +
|
| + force_text = False
|
| + if last_good.proto_message_name != proto_message_name:
|
| + logging.error(
|
| + ('Config message type for %s:%s differs in the datastore (%s) and in '
|
| + 'the code (%s). We have updated the cron job to parse configs using '
|
| + 'new message type, so this error should disappear soon. '
|
| + 'If it persists, check logs of the cron job that updates the configs.'
|
| + ),
|
| + config_set, path, last_good.proto_message_name,
|
| + proto_message_name)
|
| + # Since the message type is not necessarily the same, it is safer to
|
| + # unsuccessfully load config as text than successfully load a binary config
|
| + # of an entirely different message type.
|
| + force_text = True
|
| +
|
| + cfg = None
|
| + if proto_message_name:
|
| + if not last_good.content_binary or force_text:
|
| + logging.warning('loading a proto config from text, not binary')
|
| + else:
|
| + cfg = dest_type()
|
| + cfg.MergeFromString(last_good.content_binary)
|
| + cfg = cfg or common._convert_config(last_good.content, dest_type)
|
| + raise ndb.Return(last_good.revision, cfg)
|
|
|
|
|
| def format_url(url_format, *args):
|
|
|