Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(167)

Unified Diff: appengine/components/components/config/remote.py

Issue 2778533002: config: store binary configs (Closed)
Patch Set: addressed comments Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/components/components/config/remote.py
diff --git a/appengine/components/components/config/remote.py b/appengine/components/components/config/remote.py
index c776a2c2ab426c8ed11e8db37af2b80a1d3fbd13..467c598a6d9f38f3f026bbda33fb2329f34860b4 100644
--- a/appengine/components/components/config/remote.py
+++ b/appengine/components/components/config/remote.py
@@ -9,6 +9,12 @@ import datetime
import logging
import urllib
+# Config component is using google.protobuf package, it requires some python
+# package magic hacking.
+from components import utils
+utils.fix_protobuf_package()
+
+from google import protobuf
from google.appengine.ext import ndb
from components import net
@@ -34,7 +40,9 @@ class LastGoodConfig(ndb.Model):
Root entity. Id is "<config_set>:<path>".
"""
content = ndb.BlobProperty()
+ content_binary = ndb.BlobProperty()
content_hash = ndb.StringProperty()
+ proto_message_name = ndb.StringProperty()
revision = ndb.StringProperty()
last_access_ts = ndb.DateTimeProperty()
@@ -115,10 +123,12 @@ class Provider(object):
if content_hash and use_memcache:
yield ctx.memcache_set(
cache_key, (revision, content_hash), time=60 if get_latest else 0)
- raise ndb.Return((revision, content_hash))
+ raise ndb.Return(revision, content_hash)
@ndb.tasklet
- def get_async(self, config_set, path, revision=None, store_last_good=None):
+ def get_async(
+ self, config_set, path, revision=None, dest_type=None,
+ store_last_good=None):
"""Returns tuple (revision, content).
If not found, returns (None, None).
@@ -129,15 +139,16 @@ class Provider(object):
assert path
if store_last_good:
- last_good = yield _get_last_good_async(config_set, path)
- raise ndb.Return((last_good.revision, last_good.content))
+ result = yield _get_last_good_async(config_set, path, dest_type)
+ raise ndb.Return(result)
revision, content_hash = yield self.get_config_hash_async(
config_set, path, revision=revision)
content = None
if content_hash:
content = yield self.get_config_by_hash_async(content_hash)
- raise ndb.Return((revision, content))
+ config = common._convert_config(content, dest_type)
+ raise ndb.Return(revision, config)
@ndb.tasklet
def _get_configs_multi(self, url_path):
@@ -223,7 +234,10 @@ class Provider(object):
logging.warning(
'Could not fetch hash of latest %s', config_key.id())
return
- if current.revision == revision:
+
+ binary_missing = (
+ current.proto_message_name and not current.content_binary)
+ if current.revision == revision and not binary_missing:
assert current.content_hash == content_hash
return
@@ -243,17 +257,33 @@ class Provider(object):
'Invalid config %s:%s@%s is ignored', config_set, path, revision)
return
+ # content may be None if we think that it matches what we have locally.
+
@ndb.transactional_tasklet
def update():
config = yield config_key.get_async()
config.revision = revision
if config.content_hash != content_hash:
if content is None:
+ # Content hash matched before we started the transaction.
# Config was updated between content_hash was resolved and
- # the transaction has started. Do nothing.
+ # the transaction has started. Do nothing, next cron run will
+ # get a new hash.
return
config.content_hash = content_hash
config.content = content
+ config.content_binary = None # Invalidate to refresh below.
+
+ if config.proto_message_name and not config.content_binary:
+ try:
+ config.content_binary = _content_to_binary(
+ config.proto_message_name, config.content)
+ except common.ConfigFormatError:
+ logging.exception(
+ 'Invalid config %s:%s@%s is ignored', config_set, path,
+ revision)
+ return
+
yield config.put_async()
logging.info(
'Updated last good config %s to %s',
@@ -261,28 +291,84 @@ class Provider(object):
yield update()
+def _content_to_binary(proto_message_name, content):
+ try:
+ dest_type = protobuf.symbol_database.Default().GetSymbol(
+ proto_message_name)
+ except KeyError:
+ logging.exception(
+ 'Could not load message type %s. Skipping binary serialization',
+ proto_message_name)
+ return None
+ return common._convert_config(content, dest_type).SerializeToString()
+
+
@ndb.tasklet
-def _get_last_good_async(config_set, path):
- """Fetches LastGoodConfig and updates last_access_ts if needed."""
+def _get_last_good_async(config_set, path, dest_type):
+ """Returns last good (rev, config) and updates last_access_ts if needed."""
now = utils.utcnow()
- # By inserting an entity we tell a Cron job to fetch it from a config service.
last_good_id = '%s:%s' % (config_set, path)
- last_good = yield LastGoodConfig.get_or_insert_async(
- last_good_id, last_access_ts=now)
- # Update last_access_ts, but not on each call.
- if (last_good.last_access_ts is None or
- now - last_good.last_access_ts > UPDATE_LAST_ACCESS_TIME_FREQUENCY):
+ proto_message_name = None
+ if dest_type and issubclass(dest_type, protobuf.message.Message):
+ proto_message_name = dest_type.DESCRIPTOR.full_name
+ try:
+ protobuf.symbol_database.Default().GetSymbol(proto_message_name)
+ except KeyError: # pragma: no cover
+ logging.exception(
+ 'Recompile %s proto message with the latest protoc',
+ proto_message_name)
+ proto_message_name = None
+
+ last_good = yield LastGoodConfig.get_by_id_async(last_good_id)
+
+ # If entity does not exist, or its last_access_ts wasn't updated for a while
+ # or its proto_message_name is not up to date, then update the entity.
+ if (not last_good or
+ not last_good.last_access_ts or
+ now - last_good.last_access_ts > UPDATE_LAST_ACCESS_TIME_FREQUENCY or
+ last_good.proto_message_name != proto_message_name):
# pylint does not like this usage of transactional_tasklet
# pylint: disable=no-value-for-parameter
@ndb.transactional_tasklet(propagation=ndb.TransactionOptions.INDEPENDENT)
def update():
- last_good = yield LastGoodConfig.get_or_insert_async(
- last_good_id, last_access_ts=now)
+ last_good = yield LastGoodConfig.get_by_id_async(last_good_id)
+ last_good = last_good or LastGoodConfig(id=last_good_id)
last_good.last_access_ts = now
+ if last_good.proto_message_name != proto_message_name:
+ last_good.content_binary = None
+ last_good.proto_message_name = proto_message_name
yield last_good.put_async()
yield update()
- raise ndb.Return(last_good)
+
+ if not last_good or not last_good.revision:
+ # The config wasn't loaded yet.
+ raise ndb.Return(None, None)
+
+ force_text = False
+ if last_good.proto_message_name != proto_message_name:
+ logging.error(
+ ('Config message type for %s:%s differs in the datastore (%s) and in '
+ 'the code (%s). We have updated the cron job to parse configs using '
+ 'new message type, so this error should disappear soon. '
+ 'If it persists, check logs of the cron job that updates the configs.'
+ ),
+ config_set, path, last_good.proto_message_name,
+ proto_message_name)
+ # Since the message type is not necessarily the same, it is safer to
+ # unsuccessfully load config as text than successfully load a binary config
+ # of an entirely different message type.
+ force_text = True
+
+ cfg = None
+ if proto_message_name:
+ if not last_good.content_binary or force_text:
+ logging.warning('loading a proto config from text, not binary')
+ else:
+ cfg = dest_type()
+ cfg.MergeFromString(last_good.content_binary)
+ cfg = cfg or common._convert_config(last_good.content, dest_type)
+ raise ndb.Return(last_good.revision, cfg)
def format_url(url_format, *args):

Powered by Google App Engine
This is Rietveld 408576698