Index: filter/dscache/dscache.go |
diff --git a/filter/dscache/dscache.go b/filter/dscache/dscache.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..6e1c15369a52397e091eb98b4484c05a787e3034 |
--- /dev/null |
+++ b/filter/dscache/dscache.go |
@@ -0,0 +1,209 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+package dscache |
+ |
+import ( |
+ "bytes" |
+ "crypto/sha1" |
+ "encoding/base64" |
+ "fmt" |
+ "sync" |
+ "time" |
+ |
+ "github.com/luci/gae/service/datastore" |
+ "github.com/luci/gae/service/info" |
+ "github.com/luci/gae/service/memcache" |
+ "github.com/luci/luci-go/common/clock" |
+ "golang.org/x/net/context" |
+) |
+ |
+var ( |
+ // InstanceEnabledStatic allows you to statically (e.g. in an init() function) |
+ // bypass this filter by setting it to false. This takes effect when the |
+ // application calls IsGloballyEnabled. |
+ InstanceEnabledStatic = true |
+ |
+ // LockTimeSeconds is the number of seconds that a "lock" memcache entry will |
+ // have its expiration set to. It's set to just over half of the frontend |
+ // request handler timeout (currently 60 seconds). |
+ LockTimeSeconds = 31 |
+ |
+ // CacheTimeSeconds is the default number of seconds that a positively cached |
+ // entity will be retained (memcache contention notwithstanding). A value of |
+ // 0 is infinite. |
+ CacheTimeSeconds int64 = int64((time.Hour * 24).Seconds()) |
dnj
2015/08/07 16:30:06
WDYT about just making this a "time.Duration" w/ t
iannucci
2015/08/07 19:41:00
I mentioned this elsewhere, but memcache truncates
|
+ |
+ // CompressionThreshold is the number of bytes of entity value after which |
+ // compression kicks in. |
+ CompressionThreshold = 860 |
+ |
+ // DefaultShards is the default number of key sharding to do. |
+ DefaultShards int = 1 |
+ |
+ // DefaultEnable indicates whether or not caching is globally enabled or |
+ // disabled by default. Can still be overridden by CacheEnableMeta. |
+ DefaultEnabled = true |
+) |
+ |
+const ( |
+ MemcacheVersion = "1" |
+ |
+ // KeyFormat is the format string used to generate memcache keys. It's |
+ // gae:<version>:<shard#>:<base64_std_nopad(sha1(datastore.Key))> |
+ KeyFormat = "gae:" + MemcacheVersion + ":%x:%s" |
+ Sha1B64Padding = 1 |
+ Sha1B64Size = 28 - Sha1B64Padding |
+ |
+ MaxShards = 256 |
+ MaxShardsLen = len("ff") |
+ InternalGAEPadding = 96 |
+ ValueSizeLimit = (1000 * 1000) - InternalGAEPadding - MaxShardsLen |
+ |
+ CacheEnableMeta = "dscache.enable" |
+ CacheExpirationMeta = "dscache.expiration" |
+ |
+ // NonceUint32s is the number of 32 bit uints to use in the 'lock' nonce. |
+ NonceUint32s = 2 |
+ |
+ // GlobalEnabledCheckInterval is how frequently IsGloballyEnabled should check |
+ // the globalEnabled datastore entry. |
+ GlobalEnabledCheckInterval = 5 * time.Minute |
+) |
+ |
+// internalValueSizeLimit is a var for testing purposes. |
+var internalValueSizeLimit = ValueSizeLimit |
+ |
+type CompressionType byte |
+ |
+const ( |
+ NoCompression CompressionType = iota |
+ ZlibCompression |
+) |
+ |
+func (c CompressionType) String() string { |
+ switch c { |
+ case NoCompression: |
+ return "NoCompression" |
+ case ZlibCompression: |
+ return "ZlibCompression" |
+ default: |
+ return fmt.Sprintf("UNKNOWN_CompressionType(%d)", c) |
+ } |
+} |
+ |
+// FlagValue is used to indicate if a memcache entry currently contains an |
+// item or a lock. |
+type FlagValue uint32 |
+ |
+const ( |
+ ItemUKNONWN FlagValue = iota |
+ ItemHasData |
+ ItemHasLock |
+) |
+ |
+func MakeMemcacheKey(shard int, k datastore.Key) string { |
+ return fmt.Sprintf(KeyFormat, shard, HashKey(k)) |
+} |
+ |
+func HashKey(k datastore.Key) string { |
+ // errs can't happen, since we're using a byte buffer. |
+ buf := bytes.Buffer{} |
+ _ = datastore.WriteKey(&buf, datastore.WithoutContext, k) |
+ dgst := sha1.Sum(buf.Bytes()) |
+ buf.Reset() |
+ enc := base64.NewEncoder(base64.StdEncoding, &buf) |
+ _, _ = enc.Write(dgst[:]) |
+ enc.Close() |
+ return buf.String()[:buf.Len()-Sha1B64Padding] |
+} |
+ |
+type GlobalConfig struct { |
dnj
2015/08/07 16:30:06
IMO move to a separate ".go" file.
iannucci
2015/08/07 19:41:00
Done.
|
+ _id int64 `gae:"$id,1"` |
+ _kind string `gae:"$kind,dscache"` |
+ |
+ Enable bool |
+} |
+ |
+var ( |
+ globalEnabledLock = sync.RWMutex{} |
+ |
+ // globalEnabled is whether or not memcache has been globally enabled. It is |
+ // populated by IsGloballyEnabled when SetDynamicGlobalEnable has been set to |
+ // true. |
+ globalEnabled = true |
+ |
+ // globalEnabledNextCheck is IsGloballyEnabled's last successful check of the |
+ // global disable key. |
+ globalEnabledNextCheck = time.Time{} |
+) |
+ |
+// IsGloballyEnabled checks to see if this filter is enabled globally. |
+// |
+// This checks InstanceEnabledStatic, as well as polls the datastore entity |
+// /dscache,1 (a GlobalConfig instance) |
+// Once every GlobalEnabledCheckInterval. |
+// |
+// For correctness, any error encountered returns true. If this assumed false, |
+// then Put operations might incorrectly invalidate the cache. |
+func IsGloballyEnabled(c context.Context) bool { |
+ if !InstanceEnabledStatic { |
+ return false |
+ } |
+ |
+ now := clock.Now(c) |
+ |
+ globalEnabledLock.RLock() |
dnj
2015/08/07 16:30:07
For peace of mind (and to avoid C-style cleanup pa
iannucci
2015/08/07 19:41:00
derp, dunno what I was thinking. did this.
|
+ if now.Before(globalEnabledNextCheck) { |
+ globalEnabledLock.RUnlock() |
+ return globalEnabled |
+ } |
+ globalEnabledLock.RUnlock() |
+ |
+ globalEnabledLock.Lock() |
+ defer globalEnabledLock.Unlock() |
+ // just in case we raced |
+ if now.Before(globalEnabledNextCheck) { |
+ return globalEnabled |
+ } |
+ |
+ // alawys go to the default namespace |
dnj
2015/08/07 16:30:06
always
iannucci
2015/08/07 19:41:00
Done.
|
+ c, err := info.Get(c).Namespace("") |
+ if err != nil { |
+ return true |
dnj
2015/08/07 16:30:06
When the memcache failure case can perpetuate stal
iannucci
2015/08/07 19:41:00
Nope, because then you could end up doing Put oper
|
+ } |
+ cfg := &GlobalConfig{Enable: true} |
+ if err := datastore.Get(c).Get(cfg); err != nil && err != datastore.ErrNoSuchEntity { |
+ return true |
+ } |
+ globalEnabled = cfg.Enable |
+ globalEnabledNextCheck = now.Add(GlobalEnabledCheckInterval) |
+ return globalEnabled |
dnj
2015/08/07 16:30:06
Just return "cfg.Enable".
iannucci
2015/08/07 19:41:00
I actually prefer not to. globalEnabled is the val
|
+} |
+ |
+func SetDynamicGlobalEnable(c context.Context, memcacheEnabled bool) error { |
dnj
2015/08/07 16:30:07
Why "Dynamic"? Seems inconsistent with the other v
iannucci
2015/08/07 19:41:00
old naming artifact. fixed.
|
+ // alawys go to the default namespace |
+ c, err := info.Get(c).Namespace("") |
+ if err != nil { |
+ return err |
+ } |
+ return datastore.Get(c).RunInTransaction(func(c context.Context) error { |
+ ds := datastore.Get(c) |
+ cfg := &GlobalConfig{Enable: true} |
+ if err := ds.Get(cfg); err != nil && err != datastore.ErrNoSuchEntity { |
+ return err |
+ } |
+ if cfg.Enable == memcacheEnabled { |
+ return nil |
+ } |
+ cfg.Enable = memcacheEnabled |
+ if memcacheEnabled { |
+ // when going false -> true, wipe memcache. |
+ if err := memcache.Get(c).Flush(); err != nil { |
+ return err |
+ } |
+ } |
+ return ds.Put(cfg) |
+ }, nil) |
+} |