Chromium Code Reviews| Index: appengine/gaeconfig/ds.go |
| diff --git a/appengine/gaeconfig/ds.go b/appengine/gaeconfig/ds.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..dbde642122cb0789ec4295327e8d87b29a03039a |
| --- /dev/null |
| +++ b/appengine/gaeconfig/ds.go |
| @@ -0,0 +1,272 @@ |
| +// Copyright 2016 The LUCI Authors. All rights reserved. |
| +// Use of this source code is governed under the Apache License, Version 2.0 |
| +// that can be found in the LICENSE file. |
| + |
| +package gaeconfig |
| + |
| +import ( |
| + "time" |
| + |
| + "github.com/luci/luci-go/appengine/datastorecache" |
| + "github.com/luci/luci-go/common/clock" |
| + "github.com/luci/luci-go/common/errors" |
| + log "github.com/luci/luci-go/common/logging" |
| + "github.com/luci/luci-go/server/config" |
| + "github.com/luci/luci-go/server/config/access" |
| + "github.com/luci/luci-go/server/config/caching" |
| + |
| + "golang.org/x/net/context" |
| +) |
| + |
| +const ( |
| + dsCacheSchema = "v1" |
| + |
| + // dsRPCDeadline is the deadline applied to config service RPCs. |
| + dsRPCDeadline = 10 * time.Minute |
| +) |
| + |
| +var dsHandlerKey = "github.com/luci/luci-go/appengine/gaeconfig.dsHandlerKey" |
| + |
| +func getCacheHandler(c context.Context) datastorecache.Handler { |
| + v, _ := c.Value(&dsHandlerKey).(datastorecache.Handler) |
| + return v |
| +} |
| + |
| +// dsCacheHandler is our registered datastore cache, bound to our Config |
| +// Handler. The generator function is used by the cache manager task to get |
| +// a Handler instance during refresh. |
| +var dsCache = datastorecache.Cache{ |
| + Name: "github.com/luci/luci-go/appengine/gaeconfig", |
| + AccessUpdateInterval: 24 * time.Hour, |
| + PruneFactor: 4, |
| + Parallel: 16, |
| + HandlerFunc: getCacheHandler, |
| +} |
| + |
| +// dsCacheBackend is an interface around datastoreCache.Cache functionality used |
| +// by dsCacheFilter. |
| +// |
| +// We specialize this in testing to swap in other cache backends. |
| +type dsCacheBackend interface { |
| + Get(c context.Context, key []byte) (datastorecache.Value, error) |
| +} |
| + |
| +type datastoreCache struct { |
| + refreshInterval time.Duration |
| + failOpen bool |
| + |
| + // userProjAccess is cache of the current user's project access lookups. |
| + userProjAccess map[string]bool |
| + // anonProjAccess is cache of the current user's project access lookups. |
| + anonProjAccess map[string]bool |
| + |
| + // cache, if nil, is the datastore cache. This can be set for testing. |
| + cache dsCacheBackend |
| +} |
| + |
| +func (dc *datastoreCache) getBackend(base config.Backend) config.Backend { |
| + return &caching.Backend{ |
| + Backend: base, |
| + HardFailure: !dc.failOpen, |
| + CacheGet: dc.cacheGet, |
| + } |
| +} |
| + |
| +// withHandler installs a datastorecache.Handler into our Context. This is |
| +// used during dsCache.Refresh calls. |
| +// |
| +// The Handler binds the parameters and Loader to the resolution call. For |
| +// service resolution, this will be the Loader that is provided by the caching |
| +// layer. For cron refresh, his will be the generic Loader provided by |
|
iannucci
2017/01/07 21:18:36
this
dnj
2017/01/10 03:32:40
his will be Done.
|
| +// datastoreCronLoader. |
| +func (dc *datastoreCache) withHandler(c context.Context, l caching.Loader, timeout time.Duration) context.Context { |
| + handler := dsCacheHandler{ |
| + refreshInterval: dc.refreshInterval, |
| + failOpen: dc.failOpen, |
| + loader: l, |
| + loaderTimeout: timeout, |
| + } |
| + return context.WithValue(c, &dsHandlerKey, &handler) |
| +} |
| + |
| +func (dc *datastoreCache) cacheGet(c context.Context, key caching.Key, l caching.Loader) ( |
| + *caching.Value, error) { |
| + |
| + cache := dc.cache |
| + if cache == nil { |
| + cache = &dsCache |
| + } |
| + |
| + // Modify our cache key to always refresh AsService (ACLs will be asserted on |
| + // load) and request full-content. |
| + origAuthority := key.Authority |
| + key.Authority = config.AsService |
| + |
| + // Pre-operation checks and adjustments, for operations whose access can be |
| + // denied without needing to actually on the input parameters alone. |
|
iannucci
2017/01/07 21:18:36
I think something went missing in this sentance. o
dnj
2017/01/10 03:32:40
I cleaned this up, breaking into two stages: one t
|
| + switch key.Op { |
| + case caching.OpGet, caching.OpGetAll: |
| + // Always ask for full content. |
| + key.Content = true |
| + |
| + case caching.OpConfigSetURL: |
| + if err := access.Check(c, origAuthority, key.ConfigSet); err != nil { |
| + // Empty "URL" field in value means |
| + return &caching.Value{}, nil |
| + } |
| + } |
| + |
| + // Encode our caching key, and use this for our datastore cache key. |
| + // |
| + // This gets recoded in dsCacheHandler's "Refresh" to identify the cache |
| + // operation that is being performed. |
| + encKey, err := caching.Encode(&key) |
| + if err != nil { |
| + return nil, errors.Annotate(err).Reason("failed to encode cache key").Err() |
| + } |
| + |
| + // Construct a cache handler. |
| + v, err := cache.Get(dc.withHandler(c, l, 0), encKey) |
|
iannucci
2017/01/07 21:18:36
the datastorecache handler will implicitly do a pr
dnj
2017/01/10 03:32:40
Yes. If the entry is missing, it will be added, an
|
| + if err != nil { |
| + return nil, err |
| + } |
| + |
| + // Decode our response. |
| + if v.Schema != dsCacheSchema { |
| + return nil, errors.Reason("response schema (%(resp)q) doesn't match current (%(cur)q)"). |
| + D("resp", v.Schema).D("cur", dsCacheSchema).Err() |
| + } |
| + |
| + cacheValue, err := caching.DecodeValue(v.Data) |
| + if err != nil { |
| + return nil, errors.Annotate(err).Reason("failed to decode cached value").Err() |
| + } |
| + |
| + // Prune any responses that are not permitted for the supplied Authority. |
| + switch key.Op { |
| + case caching.OpGet, caching.OpGetAll: |
| + if len(cacheValue.Items) > 0 { |
| + // Shift over any elements that can't be accessed. |
| + ptr := 0 |
| + for _, itm := range cacheValue.Items { |
| + if dc.accessConfigSet(c, origAuthority, itm.ConfigSet) { |
| + cacheValue.Items[ptr] = itm |
| + ptr++ |
| + } |
| + } |
| + cacheValue.Items = cacheValue.Items[:ptr] |
| + } |
| + } |
| + |
| + return cacheValue, nil |
| +} |
| + |
| +func (dc *datastoreCache) accessConfigSet(c context.Context, a config.Authority, configSet string) bool { |
| + var cacheMap *map[string]bool |
| + switch a { |
| + case config.AsService: |
| + return true |
| + case config.AsUser: |
| + cacheMap = &dc.userProjAccess |
| + default: |
| + cacheMap = &dc.anonProjAccess |
| + } |
| + |
| + // If we've already cached this project access, return the cached value. |
| + if v, ok := (*cacheMap)[configSet]; ok { |
| + return v |
| + } |
| + |
| + // Perform a soft access check. |
| + canAccess := false |
| + switch err := access.Check(c, a, configSet); err { |
| + case nil: |
| + canAccess = true |
| + |
| + case access.ErrNoAccess: |
| + // No access. |
| + break |
| + case config.ErrNoConfig: |
| + log.Fields{ |
| + "configSet": configSet, |
| + }.Debugf(c, "Checking access to project without a config.") |
| + default: |
| + log.Fields{ |
| + log.ErrorKey: err, |
| + "configSet": configSet, |
| + }.Warningf(c, "Error checking for project access.") |
| + } |
| + |
| + // Cache the result for future lookups. |
| + if *cacheMap == nil { |
| + *cacheMap = make(map[string]bool) |
| + } |
| + (*cacheMap)[configSet] = canAccess |
| + return canAccess |
| +} |
| + |
| +type dsCacheValue struct { |
| + // Key is the cache Key. |
| + Key caching.Key `json:"k"` |
| + // Value is the cache Value. |
| + Value *caching.Value `json:"v"` |
| +} |
| + |
| +type dsCacheHandler struct { |
| + failOpen bool |
| + refreshInterval time.Duration |
| + loader caching.Loader |
| + |
| + // loaderTimeout, if >0, will be applied prior to performing the loader |
| + // operation. This is used for cron operations. |
| + loaderTimeout time.Duration |
| +} |
| + |
| +func (dch *dsCacheHandler) FailOpen() bool { return dch.failOpen } |
| +func (dch *dsCacheHandler) RefreshInterval([]byte) time.Duration { return dch.refreshInterval } |
| + |
| +func (dch *dsCacheHandler) Refresh(c context.Context, key []byte, v datastorecache.Value) (datastorecache.Value, error) { |
| + // Decode the key into our caching key. |
| + var ck caching.Key |
| + if err := caching.Decode(key, &ck); err != nil { |
| + return v, errors.Annotate(err).Reason("failed to decode cache key").Err() |
| + } |
| + |
| + var cv *caching.Value |
| + if v.Schema == dsCacheSchema && len(v.Data) > 0 { |
| + // We have a currently-cached value, so decode it into "cv". |
| + var err error |
| + if cv, err = caching.DecodeValue(v.Data); err != nil { |
| + return v, errors.Annotate(err).Reason("failed to decode cache value").Err() |
| + } |
| + } |
| + |
| + // Apply our timeout, if configured (influences urlfetch). |
| + if dch.loaderTimeout > 0 { |
| + var cancelFunc context.CancelFunc |
| + c, cancelFunc = clock.WithTimeout(c, dch.loaderTimeout) |
| + defer cancelFunc() |
| + } |
| + |
| + // Perform a cache load on this value. |
| + cv, err := dch.loader(c, ck, cv) |
| + if err != nil { |
| + return v, errors.Annotate(err).Reason("failed to load cache value").Err() |
| + } |
| + |
| + // Encode the resulting cache value. |
| + if v.Data, err = cv.Encode(); err != nil { |
| + return v, errors.Annotate(err).Reason("failed to encode cache value").Err() |
| + } |
| + v.Schema = dsCacheSchema |
| + v.Description = ck.String() |
| + return v, nil |
| +} |
| + |
| +// datastoreCronLoader returns a caching.Loader implementation to be used |
| +// by the Cron task. |
| +func datastoreCronLoader(b config.Backend) caching.Loader { |
| + return func(c context.Context, k caching.Key, v *caching.Value) (*caching.Value, error) { |
| + return caching.CacheLoad(c, b, k, v) |
| + } |
| +} |