| Index: filter/dscache/dscache.go
|
| diff --git a/filter/dscache/dscache.go b/filter/dscache/dscache.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..5fcb9f756bed4af237b44dc53ab7bda2b9bf9f4f
|
| --- /dev/null
|
| +++ b/filter/dscache/dscache.go
|
| @@ -0,0 +1,304 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package dscache
|
| +
|
| +import (
|
| + "bytes"
|
| + "compress/zlib"
|
| + "crypto/sha1"
|
| + "encoding/base64"
|
| + "fmt"
|
| + "io/ioutil"
|
| + "math/rand"
|
| + "reflect"
|
| + "time"
|
| +
|
| + "github.com/luci/gae/service/datastore"
|
| + "github.com/luci/gae/service/memcache"
|
| +)
|
| +
|
| +var (
|
| + // LockTimeSeconds is the number of seconds that a "lock" memcache entry will
|
| + // have its expiration set to. It's set to just over half of the frontend
|
| + // request handler timeout (currently 60 seconds).
|
| + LockTimeSeconds = 31
|
| +
|
| + // CacheTimeSeconds is the default number of seconds that a positively cached
|
| + // entity will be retained (memcache contention notwithstanding). A value of
|
| + // 0 is infinite.
|
| + CacheTimeSeconds int64 = 0
|
| +
|
| + // CompressionThreshold is the number of bytes of entity value after which
|
| + // compression kicks in.
|
| + CompressionThreshold = 860
|
| +
|
| + // DefaultShards is the default number of key sharding to do.
|
| + DefaultShards int = 1
|
| +
|
| + // DefaultEnable indicates whether or not caching is globally enabled or
|
| + // disabled by default. Can still be overridden by CacheEnableMeta.
|
| + DefaultEnabled = true
|
| +
|
| + // ShardsForKey is a user-controllable global function which calculates
|
| + // the number of shards to use for a certain datastore key. The provided
|
| + // key will always be non-nil, complete and valid.
|
| + //
|
| + // The # shards returned may be between 1 and 256. Values above this range
|
| + // will be clamped into that range. A return value of 0 means that NO cache
|
| + // operations should be done for this key, regardless of the dscache.enable
|
| + // setting.
|
| + //
|
| + // If ShardsForKey is nil, the value of DefaultShards is used.
|
| + ShardsForKey func(datastore.Key) int
|
| +)
|
| +
|
| +const (
|
| + MemcacheVersion = "1"
|
| +
|
| + // KeyFormat is the format string used to generate memcache keys. It's
|
| + // gae:<version>:<shard#>:<base64_std_nopad(sha1(datastore.Key))>
|
| + KeyFormat = "gae:" + MemcacheVersion + ":%x:%s"
|
| + Sha1B64Padding = 1
|
| + Sha1B64Size = 28 - Sha1B64Padding
|
| +
|
| + MaxShards = 256
|
| + MaxShardsLen = len("ff")
|
| + InternalGAEPadding = 96
|
| + ValueSizeLimit = (1000 * 1000) - InternalGAEPadding - MaxShardsLen
|
| +
|
| + CacheEnableMeta = "dscache.enable"
|
| + CacheExpirationMeta = "dscache.expiration"
|
| +
|
| + // NonceBytes is the number of bytes to use in the 'lock' nonce. It must be
|
| + // a multiple of 4.
|
| + NonceBytes = 8
|
| +)
|
| +
|
| +type CompressionType byte
|
| +
|
| +const (
|
| + NoCompression CompressionType = iota
|
| + ZlibCompression
|
| +)
|
| +
|
| +func (c CompressionType) String() string {
|
| + switch c {
|
| + case NoCompression:
|
| + return "NoCompression"
|
| + case ZlibCompression:
|
| + return "ZlibCompression"
|
| + default:
|
| + return fmt.Sprintf("UNKNOWN_CompressionType(%d)", c)
|
| + }
|
| +}
|
| +
|
| +// FlagValue is used to indicate if a memcache entry currently contains an
|
| +// item or a lock.
|
| +type FlagValue uint32
|
| +
|
| +const (
|
| + ItemHasData FlagValue = iota
|
| + ItemHasLock
|
| +)
|
| +
|
| +func meta(val datastore.PropertyMap, key string, dflt interface{}) interface{} {
|
| + ret, err := val.GetMeta(key)
|
| + if err == datastore.ErrMetaFieldUnset {
|
| + return dflt
|
| + }
|
| + if reflect.TypeOf(dflt) != reflect.TypeOf(ret) {
|
| + // TODO(riannucci): Panic? Log a warning?
|
| + return dflt
|
| + }
|
| + return ret
|
| +}
|
| +
|
| +func numShards(k datastore.Key) int {
|
| + if datastore.KeyIncomplete(k) {
|
| + return 0
|
| + }
|
| + ret := DefaultShards
|
| + if ShardsForKey != nil {
|
| + ret = ShardsForKey(k)
|
| + }
|
| + if ret < 1 {
|
| + return 0 // disable caching entirely
|
| + }
|
| + if ret > MaxShards {
|
| + ret = MaxShards
|
| + }
|
| + return ret
|
| +}
|
| +
|
| +func mkKeySuffix(k datastore.Key) string {
|
| + buf := bytes.Buffer{}
|
| + if err := datastore.WriteKey(&buf, datastore.WithoutContext, k); err != nil {
|
| + panic(err) // can't happen, since we're using a byte buffer.
|
| + }
|
| + dgst := sha1.Sum(buf.Bytes())
|
| + buf.Reset()
|
| + enc := base64.NewEncoder(base64.StdEncoding, &buf)
|
| + _, err := enc.Write(dgst[:])
|
| + enc.Close()
|
| + if err != nil {
|
| + panic(err) // can't happen, since we're using a byte buffer.
|
| + }
|
| + return buf.String()[:buf.Len()-Sha1B64Padding]
|
| +}
|
| +
|
| +func mkRandKeys(mr *rand.Rand, keys []datastore.Key) []string {
|
| + ret := make([]string, len(keys))
|
| + for i, key := range keys {
|
| + shards := numShards(key)
|
| + if shards == 0 {
|
| + continue
|
| + }
|
| + ret[i] = fmt.Sprintf(KeyFormat, mr.Intn(shards), mkKeySuffix(key))
|
| + }
|
| + return ret
|
| +}
|
| +
|
| +func mkAllKeys(keys []datastore.Key) []string {
|
| + size := 0
|
| + nums := make([]int, len(keys))
|
| + for i, k := range keys {
|
| + shards := numShards(k)
|
| + nums[i] = shards
|
| + size += shards
|
| + }
|
| + if size == 0 {
|
| + return nil
|
| + }
|
| + ret := make([]string, 0, size)
|
| + for i, key := range keys {
|
| + keySuffix := mkKeySuffix(key)
|
| + for shard := 0; shard < nums[i]; shard++ {
|
| + ret = append(ret, fmt.Sprintf(KeyFormat, shard, keySuffix))
|
| + }
|
| + }
|
| + return ret
|
| +}
|
| +
|
| +func init() {
|
| + // TODO(riannucci): remove this when using crypto/rand instead of math/rand.
|
| + if NonceBytes%4 != 0 {
|
| + panic("NonceBytes must be a multiple of 4")
|
| + }
|
| +}
|
| +
|
| +// crappyNonce creates a really crapyp nonce using math/rand. This is generally
|
| +// unacceptable for cryptographic purposes, but since mathrand is the only
|
| +// mocked randomness source, we use that.
|
| +//
|
| +// Do not use this function for anything other than mkRandLockItems or your hair
|
| +// will fall out. You've been warned.
|
| +//
|
| +// TODO(riannucci): make this use crypto/rand instead
|
| +func crappyNonce(mr *rand.Rand) []byte {
|
| + ret := make([]byte, NonceBytes)
|
| + for w := uint(0); w < NonceBytes/4; w++ {
|
| + word := mr.Uint32()
|
| + for i := uint(0); i < 4; i++ {
|
| + ret[(w*4)+i] = byte(word >> (8 * i))
|
| + }
|
| + }
|
| + return ret
|
| +}
|
| +
|
| +func mkRandLockItems(mc memcache.Interface, mr *rand.Rand, keys []datastore.Key) []memcache.Item {
|
| + mcKeys := mkRandKeys(mr, keys)
|
| + if mcKeys == nil {
|
| + return nil
|
| + }
|
| + nonce := crappyNonce(mr)
|
| + ret := make([]memcache.Item, len(mcKeys))
|
| + for i := range ret {
|
| + ret[i] = (mc.NewItem(mcKeys[i]).
|
| + SetFlags(uint32(ItemHasLock)).
|
| + SetExpiration(time.Second * time.Duration(LockTimeSeconds)).
|
| + SetValue(nonce))
|
| + }
|
| + return ret
|
| +}
|
| +
|
| +func mkAllLockItems(mc memcache.Interface, keys []datastore.Key) ([]memcache.Item, []string) {
|
| + mcKeys := mkAllKeys(keys)
|
| + if mcKeys == nil {
|
| + return nil, nil
|
| + }
|
| + ret := make([]memcache.Item, len(mcKeys))
|
| + for i := range ret {
|
| + ret[i] = (mc.NewItem(mcKeys[i]).
|
| + SetFlags(uint32(ItemHasLock)).
|
| + SetExpiration(time.Second * time.Duration(LockTimeSeconds)))
|
| + }
|
| + return ret, mcKeys
|
| +}
|
| +
|
| +func mkItemData(pm datastore.PropertyMap) ([]byte, error) {
|
| + pm, _ = pm.Save(false)
|
| +
|
| + data := []byte(nil)
|
| +
|
| + if pm.DataLen() > 0 {
|
| + buf := bytes.Buffer{}
|
| + if err := buf.WriteByte(byte(NoCompression)); err != nil {
|
| + panic(err) // can't happen, since we're using a byte buffer.
|
| + }
|
| + if err := pm.Write(&buf, datastore.WithoutContext); err != nil {
|
| + panic(err) // can't happen, since we're using a byte buffer.
|
| + }
|
| +
|
| + data = buf.Bytes()
|
| + if buf.Len() > CompressionThreshold {
|
| + buf2 := bytes.NewBuffer(make([]byte, 0, len(data)))
|
| + if err := buf2.WriteByte(byte(ZlibCompression)); err != nil {
|
| + panic(err) // can't happen, since we're using a byte buffer.
|
| + }
|
| + writer := zlib.NewWriter(buf2)
|
| + _, err := writer.Write(data[1:]) // skip the NoCompression byte
|
| + writer.Close()
|
| + if err != nil {
|
| + panic(err) // can't happen, since we're using a byte buffer.
|
| + }
|
| + data = buf2.Bytes()
|
| + }
|
| + if len(data) > ValueSizeLimit {
|
| + // TODO(riannucci): should we cache the fact that it's too big?
|
| + data = nil // disables memcaching
|
| + }
|
| + }
|
| +
|
| + return data, nil
|
| +}
|
| +
|
| +func decodeValue(val []byte, ns, aid string) (datastore.PropertyMap, error) {
|
| + if len(val) == 0 {
|
| + return nil, datastore.ErrNoSuchEntity
|
| + }
|
| + buf := bytes.NewBuffer(val)
|
| + compTypeByte, err := buf.ReadByte()
|
| + if err != nil {
|
| + panic(err) // can't happen, since we're using a byte buffer.
|
| + }
|
| + if CompressionType(compTypeByte) == ZlibCompression {
|
| + reader, err := zlib.NewReader(buf)
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| + defer reader.Close()
|
| + data, err := ioutil.ReadAll(reader)
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| + buf = bytes.NewBuffer(data)
|
| + }
|
| + ret := datastore.PropertyMap{}
|
| + err = ret.Read(buf, datastore.WithoutContext, ns, aid)
|
| + return ret, err
|
| +}
|
| +
|
| +// TODO(riannucci): Should there be a way to purge the cache entries for a range
|
| +// of datastore keys?
|
|
|