Index: filter/dscache/dscache.go |
diff --git a/filter/dscache/dscache.go b/filter/dscache/dscache.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..5fcb9f756bed4af237b44dc53ab7bda2b9bf9f4f |
--- /dev/null |
+++ b/filter/dscache/dscache.go |
@@ -0,0 +1,304 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+package dscache |
+ |
+import ( |
+ "bytes" |
+ "compress/zlib" |
+ "crypto/sha1" |
+ "encoding/base64" |
+ "fmt" |
+ "io/ioutil" |
+ "math/rand" |
+ "reflect" |
+ "time" |
+ |
+ "github.com/luci/gae/service/datastore" |
+ "github.com/luci/gae/service/memcache" |
+) |
+ |
+var ( |
+ // LockTimeSeconds is the number of seconds that a "lock" memcache entry will |
+ // have its expiration set to. It's set to just over half of the frontend |
+ // request handler timeout (currently 60 seconds). |
+ LockTimeSeconds = 31 |
+ |
+ // CacheTimeSeconds is the default number of seconds that a positively cached |
+ // entity will be retained (memcache contention notwithstanding). A value of |
+ // 0 is infinite. |
+ CacheTimeSeconds int64 = 0 |
+ |
+ // CompressionThreshold is the number of bytes of entity value after which |
+ // compression kicks in. |
+ CompressionThreshold = 860 |
+ |
+ // DefaultShards is the default number of key sharding to do. |
+ DefaultShards int = 1 |
+ |
+ // DefaultEnable indicates whether or not caching is globally enabled or |
+ // disabled by default. Can still be overridden by CacheEnableMeta. |
+ DefaultEnabled = true |
+ |
+ // ShardsForKey is a user-controllable global function which calculates |
+ // the number of shards to use for a certain datastore key. The provided |
+ // key will always be non-nil, complete and valid. |
+ // |
+ // The # shards returned may be between 1 and 256. Values above this range |
+ // will be clamped into that range. A return value of 0 means that NO cache |
+ // operations should be done for this key, regardless of the dscache.enable |
+ // setting. |
+ // |
+ // If ShardsForKey is nil, the value of DefaultShards is used. |
+ ShardsForKey func(datastore.Key) int |
+) |
+ |
+const ( |
+ MemcacheVersion = "1" |
+ |
+ // KeyFormat is the format string used to generate memcache keys. It's |
+ // gae:<version>:<shard#>:<base64_std_nopad(sha1(datastore.Key))> |
+ KeyFormat = "gae:" + MemcacheVersion + ":%x:%s" |
+ Sha1B64Padding = 1 |
+ Sha1B64Size = 28 - Sha1B64Padding |
+ |
+ MaxShards = 256 |
+ MaxShardsLen = len("ff") |
+ InternalGAEPadding = 96 |
+ ValueSizeLimit = (1000 * 1000) - InternalGAEPadding - MaxShardsLen |
+ |
+ CacheEnableMeta = "dscache.enable" |
+ CacheExpirationMeta = "dscache.expiration" |
+ |
+ // NonceBytes is the number of bytes to use in the 'lock' nonce. It must be |
+ // a multiple of 4. |
+ NonceBytes = 8 |
+) |
+ |
+type CompressionType byte |
+ |
+const ( |
+ NoCompression CompressionType = iota |
+ ZlibCompression |
+) |
+ |
+func (c CompressionType) String() string { |
+ switch c { |
+ case NoCompression: |
+ return "NoCompression" |
+ case ZlibCompression: |
+ return "ZlibCompression" |
+ default: |
+ return fmt.Sprintf("UNKNOWN_CompressionType(%d)", c) |
+ } |
+} |
+ |
+// FlagValue is used to indicate if a memcache entry currently contains an |
+// item or a lock. |
+type FlagValue uint32 |
+ |
+const ( |
+ ItemHasData FlagValue = iota |
+ ItemHasLock |
+) |
+ |
+func meta(val datastore.PropertyMap, key string, dflt interface{}) interface{} { |
+ ret, err := val.GetMeta(key) |
+ if err == datastore.ErrMetaFieldUnset { |
+ return dflt |
+ } |
+ if reflect.TypeOf(dflt) != reflect.TypeOf(ret) { |
+ // TODO(riannucci): Panic? Log a warning? |
+ return dflt |
+ } |
+ return ret |
+} |
+ |
+func numShards(k datastore.Key) int { |
+ if datastore.KeyIncomplete(k) { |
+ return 0 |
+ } |
+ ret := DefaultShards |
+ if ShardsForKey != nil { |
+ ret = ShardsForKey(k) |
+ } |
+ if ret < 1 { |
+ return 0 // disable caching entirely |
+ } |
+ if ret > MaxShards { |
+ ret = MaxShards |
+ } |
+ return ret |
+} |
+ |
+func mkKeySuffix(k datastore.Key) string { |
+ buf := bytes.Buffer{} |
+ if err := datastore.WriteKey(&buf, datastore.WithoutContext, k); err != nil { |
+ panic(err) // can't happen, since we're using a byte buffer. |
+ } |
+ dgst := sha1.Sum(buf.Bytes()) |
+ buf.Reset() |
+ enc := base64.NewEncoder(base64.StdEncoding, &buf) |
+ _, err := enc.Write(dgst[:]) |
+ enc.Close() |
+ if err != nil { |
+ panic(err) // can't happen, since we're using a byte buffer. |
+ } |
+ return buf.String()[:buf.Len()-Sha1B64Padding] |
+} |
+ |
+func mkRandKeys(mr *rand.Rand, keys []datastore.Key) []string { |
+ ret := make([]string, len(keys)) |
+ for i, key := range keys { |
+ shards := numShards(key) |
+ if shards == 0 { |
+ continue |
+ } |
+ ret[i] = fmt.Sprintf(KeyFormat, mr.Intn(shards), mkKeySuffix(key)) |
+ } |
+ return ret |
+} |
+ |
+func mkAllKeys(keys []datastore.Key) []string { |
+ size := 0 |
+ nums := make([]int, len(keys)) |
+ for i, k := range keys { |
+ shards := numShards(k) |
+ nums[i] = shards |
+ size += shards |
+ } |
+ if size == 0 { |
+ return nil |
+ } |
+ ret := make([]string, 0, size) |
+ for i, key := range keys { |
+ keySuffix := mkKeySuffix(key) |
+ for shard := 0; shard < nums[i]; shard++ { |
+ ret = append(ret, fmt.Sprintf(KeyFormat, shard, keySuffix)) |
+ } |
+ } |
+ return ret |
+} |
+ |
+func init() { |
+ // TODO(riannucci): remove this when using crypto/rand instead of math/rand. |
+ if NonceBytes%4 != 0 { |
+ panic("NonceBytes must be a multiple of 4") |
+ } |
+} |
+ |
+// crappyNonce creates a really crapyp nonce using math/rand. This is generally |
+// unacceptable for cryptographic purposes, but since mathrand is the only |
+// mocked randomness source, we use that. |
+// |
+// Do not use this function for anything other than mkRandLockItems or your hair |
+// will fall out. You've been warned. |
+// |
+// TODO(riannucci): make this use crypto/rand instead |
+func crappyNonce(mr *rand.Rand) []byte { |
+ ret := make([]byte, NonceBytes) |
+ for w := uint(0); w < NonceBytes/4; w++ { |
+ word := mr.Uint32() |
+ for i := uint(0); i < 4; i++ { |
+ ret[(w*4)+i] = byte(word >> (8 * i)) |
+ } |
+ } |
+ return ret |
+} |
+ |
+func mkRandLockItems(mc memcache.Interface, mr *rand.Rand, keys []datastore.Key) []memcache.Item { |
+ mcKeys := mkRandKeys(mr, keys) |
+ if mcKeys == nil { |
+ return nil |
+ } |
+ nonce := crappyNonce(mr) |
+ ret := make([]memcache.Item, len(mcKeys)) |
+ for i := range ret { |
+ ret[i] = (mc.NewItem(mcKeys[i]). |
+ SetFlags(uint32(ItemHasLock)). |
+ SetExpiration(time.Second * time.Duration(LockTimeSeconds)). |
+ SetValue(nonce)) |
+ } |
+ return ret |
+} |
+ |
+func mkAllLockItems(mc memcache.Interface, keys []datastore.Key) ([]memcache.Item, []string) { |
+ mcKeys := mkAllKeys(keys) |
+ if mcKeys == nil { |
+ return nil, nil |
+ } |
+ ret := make([]memcache.Item, len(mcKeys)) |
+ for i := range ret { |
+ ret[i] = (mc.NewItem(mcKeys[i]). |
+ SetFlags(uint32(ItemHasLock)). |
+ SetExpiration(time.Second * time.Duration(LockTimeSeconds))) |
+ } |
+ return ret, mcKeys |
+} |
+ |
+func mkItemData(pm datastore.PropertyMap) ([]byte, error) { |
+ pm, _ = pm.Save(false) |
+ |
+ data := []byte(nil) |
+ |
+ if pm.DataLen() > 0 { |
+ buf := bytes.Buffer{} |
+ if err := buf.WriteByte(byte(NoCompression)); err != nil { |
+ panic(err) // can't happen, since we're using a byte buffer. |
+ } |
+ if err := pm.Write(&buf, datastore.WithoutContext); err != nil { |
+ panic(err) // can't happen, since we're using a byte buffer. |
+ } |
+ |
+ data = buf.Bytes() |
+ if buf.Len() > CompressionThreshold { |
+ buf2 := bytes.NewBuffer(make([]byte, 0, len(data))) |
+ if err := buf2.WriteByte(byte(ZlibCompression)); err != nil { |
+ panic(err) // can't happen, since we're using a byte buffer. |
+ } |
+ writer := zlib.NewWriter(buf2) |
+ _, err := writer.Write(data[1:]) // skip the NoCompression byte |
+ writer.Close() |
+ if err != nil { |
+ panic(err) // can't happen, since we're using a byte buffer. |
+ } |
+ data = buf2.Bytes() |
+ } |
+ if len(data) > ValueSizeLimit { |
+ // TODO(riannucci): should we cache the fact that it's too big? |
+ data = nil // disables memcaching |
+ } |
+ } |
+ |
+ return data, nil |
+} |
+ |
+func decodeValue(val []byte, ns, aid string) (datastore.PropertyMap, error) { |
+ if len(val) == 0 { |
+ return nil, datastore.ErrNoSuchEntity |
+ } |
+ buf := bytes.NewBuffer(val) |
+ compTypeByte, err := buf.ReadByte() |
+ if err != nil { |
+ panic(err) // can't happen, since we're using a byte buffer. |
+ } |
+ if CompressionType(compTypeByte) == ZlibCompression { |
+ reader, err := zlib.NewReader(buf) |
+ if err != nil { |
+ return nil, err |
+ } |
+ defer reader.Close() |
+ data, err := ioutil.ReadAll(reader) |
+ if err != nil { |
+ return nil, err |
+ } |
+ buf = bytes.NewBuffer(data) |
+ } |
+ ret := datastore.PropertyMap{} |
+ err = ret.Read(buf, datastore.WithoutContext, ns, aid) |
+ return ret, err |
+} |
+ |
+// TODO(riannucci): Should there be a way to purge the cache entries for a range |
+// of datastore keys? |