OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package dscache |
| 6 |
| 7 import ( |
| 8 "bytes" |
| 9 "compress/zlib" |
| 10 "crypto/sha1" |
| 11 "encoding/base64" |
| 12 "fmt" |
| 13 "io/ioutil" |
| 14 "math/rand" |
| 15 "reflect" |
| 16 "time" |
| 17 |
| 18 "github.com/luci/gae/service/datastore" |
| 19 "github.com/luci/gae/service/memcache" |
| 20 ) |
| 21 |
| 22 var ( |
| 23 // LockTimeSeconds is the number of seconds that a "lock" memcache entry
will |
| 24 // have its expiration set to. It's set to just over half of the fronten
d |
| 25 // request handler timeout (currently 60 seconds). |
| 26 LockTimeSeconds = 31 |
| 27 |
| 28 // CacheTimeSeconds is the default number of seconds that a positively c
ached |
| 29 // entity will be retained (memcache contention notwithstanding). A valu
e of |
| 30 // 0 is infinite. |
| 31 CacheTimeSeconds int64 = 0 |
| 32 |
| 33 // CompressionThreshold is the number of bytes of entity value after whi
ch |
| 34 // compression kicks in. |
| 35 CompressionThreshold = 860 |
| 36 |
| 37 // DefaultShards is the default number of key sharding to do. |
| 38 DefaultShards int = 1 |
| 39 |
| 40 // DefaultEnable indicates whether or not caching is globally enabled or |
| 41 // disabled by default. Can still be overridden by CacheEnableMeta. |
| 42 DefaultEnabled = true |
| 43 |
| 44 // ShardsForKey is a user-controllable global function which calculates |
| 45 // the number of shards to use for a certain datastore key. The provided |
| 46 // key will always be non-nil, complete and valid. |
| 47 // |
| 48 // The # shards returned may be between 1 and 256. Values above this ran
ge |
| 49 // will be clamped into that range. A return value of 0 means that NO ca
che |
| 50 // operations should be done for this key, regardless of the dscache.ena
ble |
| 51 // setting. |
| 52 // |
| 53 // If ShardsForKey is nil, the value of DefaultShards is used. |
| 54 ShardsForKey func(datastore.Key) int |
| 55 ) |
| 56 |
| 57 const ( |
| 58 MemcacheVersion = "1" |
| 59 |
| 60 // KeyFormat is the format string used to generate memcache keys. It's |
| 61 // gae:<version>:<shard#>:<base64_std_nopad(sha1(datastore.Key))> |
| 62 KeyFormat = "gae:" + MemcacheVersion + ":%x:%s" |
| 63 Sha1B64Padding = 1 |
| 64 Sha1B64Size = 28 - Sha1B64Padding |
| 65 |
| 66 MaxShards = 256 |
| 67 MaxShardsLen = len("ff") |
| 68 InternalGAEPadding = 96 |
| 69 ValueSizeLimit = (1000 * 1000) - InternalGAEPadding - MaxShardsLen |
| 70 |
| 71 CacheEnableMeta = "dscache.enable" |
| 72 CacheExpirationMeta = "dscache.expiration" |
| 73 |
| 74 // NonceBytes is the number of bytes to use in the 'lock' nonce. It must
be |
| 75 // a multiple of 4. |
| 76 NonceBytes = 8 |
| 77 ) |
| 78 |
| 79 type CompressionType byte |
| 80 |
| 81 const ( |
| 82 NoCompression CompressionType = iota |
| 83 ZlibCompression |
| 84 ) |
| 85 |
| 86 func (c CompressionType) String() string { |
| 87 switch c { |
| 88 case NoCompression: |
| 89 return "NoCompression" |
| 90 case ZlibCompression: |
| 91 return "ZlibCompression" |
| 92 default: |
| 93 return fmt.Sprintf("UNKNOWN_CompressionType(%d)", c) |
| 94 } |
| 95 } |
| 96 |
| 97 // FlagValue is used to indicate if a memcache entry currently contains an |
| 98 // item or a lock. |
| 99 type FlagValue uint32 |
| 100 |
| 101 const ( |
| 102 ItemHasData FlagValue = iota |
| 103 ItemHasLock |
| 104 ) |
| 105 |
| 106 func meta(val datastore.PropertyMap, key string, dflt interface{}) interface{} { |
| 107 ret, err := val.GetMeta(key) |
| 108 if err == datastore.ErrMetaFieldUnset { |
| 109 return dflt |
| 110 } |
| 111 if reflect.TypeOf(dflt) != reflect.TypeOf(ret) { |
| 112 // TODO(riannucci): Panic? Log a warning? |
| 113 return dflt |
| 114 } |
| 115 return ret |
| 116 } |
| 117 |
| 118 func numShards(k datastore.Key) int { |
| 119 if datastore.KeyIncomplete(k) { |
| 120 return 0 |
| 121 } |
| 122 ret := DefaultShards |
| 123 if ShardsForKey != nil { |
| 124 ret = ShardsForKey(k) |
| 125 } |
| 126 if ret < 1 { |
| 127 return 0 // disable caching entirely |
| 128 } |
| 129 if ret > MaxShards { |
| 130 ret = MaxShards |
| 131 } |
| 132 return ret |
| 133 } |
| 134 |
| 135 func mkKeySuffix(k datastore.Key) string { |
| 136 buf := bytes.Buffer{} |
| 137 if err := datastore.WriteKey(&buf, datastore.WithoutContext, k); err !=
nil { |
| 138 panic(err) // can't happen, since we're using a byte buffer. |
| 139 } |
| 140 dgst := sha1.Sum(buf.Bytes()) |
| 141 buf.Reset() |
| 142 enc := base64.NewEncoder(base64.StdEncoding, &buf) |
| 143 _, err := enc.Write(dgst[:]) |
| 144 enc.Close() |
| 145 if err != nil { |
| 146 panic(err) // can't happen, since we're using a byte buffer. |
| 147 } |
| 148 return buf.String()[:buf.Len()-Sha1B64Padding] |
| 149 } |
| 150 |
| 151 func mkRandKeys(mr *rand.Rand, keys []datastore.Key) []string { |
| 152 ret := make([]string, len(keys)) |
| 153 for i, key := range keys { |
| 154 shards := numShards(key) |
| 155 if shards == 0 { |
| 156 continue |
| 157 } |
| 158 ret[i] = fmt.Sprintf(KeyFormat, mr.Intn(shards), mkKeySuffix(key
)) |
| 159 } |
| 160 return ret |
| 161 } |
| 162 |
| 163 func mkAllKeys(keys []datastore.Key) []string { |
| 164 size := 0 |
| 165 nums := make([]int, len(keys)) |
| 166 for i, k := range keys { |
| 167 shards := numShards(k) |
| 168 nums[i] = shards |
| 169 size += shards |
| 170 } |
| 171 if size == 0 { |
| 172 return nil |
| 173 } |
| 174 ret := make([]string, 0, size) |
| 175 for i, key := range keys { |
| 176 keySuffix := mkKeySuffix(key) |
| 177 for shard := 0; shard < nums[i]; shard++ { |
| 178 ret = append(ret, fmt.Sprintf(KeyFormat, shard, keySuffi
x)) |
| 179 } |
| 180 } |
| 181 return ret |
| 182 } |
| 183 |
| 184 func init() { |
| 185 // TODO(riannucci): remove this when using crypto/rand instead of math/r
and. |
| 186 if NonceBytes%4 != 0 { |
| 187 panic("NonceBytes must be a multiple of 4") |
| 188 } |
| 189 } |
| 190 |
| 191 // crappyNonce creates a really crapyp nonce using math/rand. This is generally |
| 192 // unacceptable for cryptographic purposes, but since mathrand is the only |
| 193 // mocked randomness source, we use that. |
| 194 // |
| 195 // Do not use this function for anything other than mkRandLockItems or your hair |
| 196 // will fall out. You've been warned. |
| 197 // |
| 198 // TODO(riannucci): make this use crypto/rand instead |
| 199 func crappyNonce(mr *rand.Rand) []byte { |
| 200 ret := make([]byte, NonceBytes) |
| 201 for w := uint(0); w < NonceBytes/4; w++ { |
| 202 word := mr.Uint32() |
| 203 for i := uint(0); i < 4; i++ { |
| 204 ret[(w*4)+i] = byte(word >> (8 * i)) |
| 205 } |
| 206 } |
| 207 return ret |
| 208 } |
| 209 |
| 210 func mkRandLockItems(mc memcache.Interface, mr *rand.Rand, keys []datastore.Key)
[]memcache.Item { |
| 211 mcKeys := mkRandKeys(mr, keys) |
| 212 if mcKeys == nil { |
| 213 return nil |
| 214 } |
| 215 nonce := crappyNonce(mr) |
| 216 ret := make([]memcache.Item, len(mcKeys)) |
| 217 for i := range ret { |
| 218 ret[i] = (mc.NewItem(mcKeys[i]). |
| 219 SetFlags(uint32(ItemHasLock)). |
| 220 SetExpiration(time.Second * time.Duration(LockTimeSecond
s)). |
| 221 SetValue(nonce)) |
| 222 } |
| 223 return ret |
| 224 } |
| 225 |
| 226 func mkAllLockItems(mc memcache.Interface, keys []datastore.Key) ([]memcache.Ite
m, []string) { |
| 227 mcKeys := mkAllKeys(keys) |
| 228 if mcKeys == nil { |
| 229 return nil, nil |
| 230 } |
| 231 ret := make([]memcache.Item, len(mcKeys)) |
| 232 for i := range ret { |
| 233 ret[i] = (mc.NewItem(mcKeys[i]). |
| 234 SetFlags(uint32(ItemHasLock)). |
| 235 SetExpiration(time.Second * time.Duration(LockTimeSecond
s))) |
| 236 } |
| 237 return ret, mcKeys |
| 238 } |
| 239 |
| 240 func mkItemData(pm datastore.PropertyMap) ([]byte, error) { |
| 241 pm, _ = pm.Save(false) |
| 242 |
| 243 data := []byte(nil) |
| 244 |
| 245 if pm.DataLen() > 0 { |
| 246 buf := bytes.Buffer{} |
| 247 if err := buf.WriteByte(byte(NoCompression)); err != nil { |
| 248 panic(err) // can't happen, since we're using a byte buf
fer. |
| 249 } |
| 250 if err := pm.Write(&buf, datastore.WithoutContext); err != nil { |
| 251 panic(err) // can't happen, since we're using a byte buf
fer. |
| 252 } |
| 253 |
| 254 data = buf.Bytes() |
| 255 if buf.Len() > CompressionThreshold { |
| 256 buf2 := bytes.NewBuffer(make([]byte, 0, len(data))) |
| 257 if err := buf2.WriteByte(byte(ZlibCompression)); err !=
nil { |
| 258 panic(err) // can't happen, since we're using a
byte buffer. |
| 259 } |
| 260 writer := zlib.NewWriter(buf2) |
| 261 _, err := writer.Write(data[1:]) // skip the NoCompressi
on byte |
| 262 writer.Close() |
| 263 if err != nil { |
| 264 panic(err) // can't happen, since we're using a
byte buffer. |
| 265 } |
| 266 data = buf2.Bytes() |
| 267 } |
| 268 if len(data) > ValueSizeLimit { |
| 269 // TODO(riannucci): should we cache the fact that it's t
oo big? |
| 270 data = nil // disables memcaching |
| 271 } |
| 272 } |
| 273 |
| 274 return data, nil |
| 275 } |
| 276 |
| 277 func decodeValue(val []byte, ns, aid string) (datastore.PropertyMap, error) { |
| 278 if len(val) == 0 { |
| 279 return nil, datastore.ErrNoSuchEntity |
| 280 } |
| 281 buf := bytes.NewBuffer(val) |
| 282 compTypeByte, err := buf.ReadByte() |
| 283 if err != nil { |
| 284 panic(err) // can't happen, since we're using a byte buffer. |
| 285 } |
| 286 if CompressionType(compTypeByte) == ZlibCompression { |
| 287 reader, err := zlib.NewReader(buf) |
| 288 if err != nil { |
| 289 return nil, err |
| 290 } |
| 291 defer reader.Close() |
| 292 data, err := ioutil.ReadAll(reader) |
| 293 if err != nil { |
| 294 return nil, err |
| 295 } |
| 296 buf = bytes.NewBuffer(data) |
| 297 } |
| 298 ret := datastore.PropertyMap{} |
| 299 err = ret.Read(buf, datastore.WithoutContext, ns, aid) |
| 300 return ret, err |
| 301 } |
| 302 |
| 303 // TODO(riannucci): Should there be a way to purge the cache entries for a range |
| 304 // of datastore keys? |
OLD | NEW |