Index: impl/cloud/memcache.go |
diff --git a/impl/cloud/memcache.go b/impl/cloud/memcache.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..274fed50dae27fc499173cd23a71f3fd63811862 |
--- /dev/null |
+++ b/impl/cloud/memcache.go |
@@ -0,0 +1,272 @@ |
+// Copyright 2016 The LUCI Authors. All rights reserved. |
+// Use of this source code is governed under the Apache License, Version 2.0 |
+// that can be found in the LICENSE file. |
+ |
+package cloud |
+ |
+import ( |
+ "crypto/sha256" |
+ "encoding/hex" |
+ "strconv" |
+ "time" |
+ |
+ "github.com/luci/gae/service/info" |
+ mc "github.com/luci/gae/service/memcache" |
+ |
+ "github.com/bradfitz/gomemcache/memcache" |
+ "golang.org/x/net/context" |
+) |
+ |
+// memcacheKeyPrefix is the common prefix prepended to memcached keys created |
+// by this package. It is intended to ensure that keys do not conflict with |
+// other users of the service. |
+const memcacheKeyPrefix = "github.com/luci/gae/impl/cloud:" |
+ |
+// memcacheClient is a "service/memcache" implementation built on top of a |
+// "memcached" client connection. |
+// |
+// Because "memcached" has no concept of a namespace, we differentiate memcache |
+// entries by prepending "memcacheKeyPrefix:SHA256(namespace):" to each key. |
iannucci
2016/11/20 19:35:16
note that this is basically how it's done in prod.
dnj
2016/11/20 22:28:47
Oh cool, I'll add null-byte restriction and >250 b
dnj
2016/11/21 05:24:33
Actually on second thought:
The null-byte restric
|
+type memcacheClient struct { |
+ client *memcache.Client |
+} |
+ |
+func (m *memcacheClient) use(c context.Context) context.Context { |
+ return mc.SetRawFactory(c, func(ic context.Context) mc.RawInterface { |
+ return bindMemcacheClient(m, info.GetNamespace(ic)) |
+ }) |
+} |
+ |
+type memcacheItem struct { |
+ native *memcache.Item |
+} |
+ |
+func (it *memcacheItem) Key() string { return it.native.Key } |
+func (it *memcacheItem) Value() []byte { return it.native.Value } |
+func (it *memcacheItem) Flags() uint32 { return it.native.Flags } |
+func (it *memcacheItem) Expiration() time.Duration { |
+ return time.Duration(it.native.Expiration) * time.Second |
+} |
+ |
+func (it *memcacheItem) SetKey(v string) mc.Item { |
+ it.native.Key = v |
+ return it |
+} |
+ |
+func (it *memcacheItem) SetValue(v []byte) mc.Item { |
+ it.native.Value = v |
+ return it |
+} |
+ |
+func (it *memcacheItem) SetFlags(v uint32) mc.Item { |
+ it.native.Flags = v |
+ return it |
+} |
+ |
+func (it *memcacheItem) SetExpiration(v time.Duration) mc.Item { |
+ it.native.Expiration = int32(v.Seconds()) |
+ return it |
+} |
+ |
+func (it *memcacheItem) SetAll(other mc.Item) { |
+ origKey := it.native.Key |
+ |
+ var on memcache.Item |
+ if other != nil { |
+ on = *(other.(*memcacheItem).native) |
+ } |
+ it.native = &on |
+ it.native.Key = origKey |
+} |
+ |
+type boundMemcacheClient struct { |
+ *memcacheClient |
+ keyPrefix string |
+} |
+ |
+func bindMemcacheClient(mc *memcacheClient, ns string) *boundMemcacheClient { |
+ nsHash := sha256.Sum256([]byte(ns)) |
+ nsPrefix := hex.EncodeToString(nsHash[:]) |
+ return &boundMemcacheClient{ |
+ memcacheClient: mc, |
+ keyPrefix: memcacheKeyPrefix + nsPrefix + ":", |
+ } |
+} |
+ |
+func (*boundMemcacheClient) newMemcacheItem(nativeKey string) *memcacheItem { |
+ return &memcacheItem{ |
+ native: &memcache.Item{ |
+ Key: nativeKey, |
+ }, |
+ } |
+} |
+ |
+func (bmc *boundMemcacheClient) makeKey(base string) string { return bmc.keyPrefix + base } |
+func (bmc *boundMemcacheClient) userKey(key string) string { return key[len(bmc.keyPrefix):] } |
+ |
+func (bmc *boundMemcacheClient) nativeItem(itm mc.Item) *memcache.Item { |
+ ni := *(itm.(*memcacheItem).native) |
+ ni.Key = bmc.makeKey(ni.Key) |
+ return &ni |
+} |
+ |
+func (bmc *boundMemcacheClient) NewItem(key string) mc.Item { return bmc.newMemcacheItem(key) } |
+ |
+func (bmc *boundMemcacheClient) AddMulti(items []mc.Item, cb mc.RawCB) error { |
+ for _, itm := range items { |
+ err := bmc.client.Add(bmc.nativeItem(itm)) |
+ cb(bmc.translateErr(err)) |
+ } |
+ return nil |
+} |
+ |
+func (bmc *boundMemcacheClient) SetMulti(items []mc.Item, cb mc.RawCB) error { |
+ for _, itm := range items { |
+ err := bmc.client.Set(bmc.nativeItem(itm)) |
+ cb(bmc.translateErr(err)) |
+ } |
+ return nil |
+} |
+ |
+func (bmc *boundMemcacheClient) GetMulti(keys []string, cb mc.RawItemCB) error { |
+ nativeKeys := make([]string, len(keys)) |
+ for i, key := range keys { |
+ nativeKeys[i] = bmc.makeKey(key) |
+ } |
+ |
+ itemMap, err := bmc.client.GetMulti(nativeKeys) |
+ if err != nil { |
+ return bmc.translateErr(err) |
+ } |
+ |
+ // Translate the item keys back to user keys. |
+ for _, v := range itemMap { |
+ v.Key = bmc.userKey(v.Key) |
+ } |
+ |
+ for _, k := range nativeKeys { |
+ if it := itemMap[k]; it != nil { |
+ cb(&memcacheItem{native: it}, nil) |
+ } else { |
+ cb(nil, mc.ErrCacheMiss) |
+ } |
+ } |
+ return nil |
+} |
+ |
+func (bmc *boundMemcacheClient) DeleteMulti(keys []string, cb mc.RawCB) error { |
+ for _, k := range keys { |
+ err := bmc.client.Delete(bmc.makeKey(k)) |
+ cb(bmc.translateErr(err)) |
+ } |
+ return nil |
+} |
+ |
+func (bmc *boundMemcacheClient) CompareAndSwapMulti(items []mc.Item, cb mc.RawCB) error { |
+ for _, itm := range items { |
+ err := bmc.client.CompareAndSwap(bmc.nativeItem(itm)) |
+ cb(bmc.translateErr(err)) |
+ } |
+ return nil |
+} |
+ |
+func (bmc *boundMemcacheClient) Increment(key string, delta int64, initialValue *uint64) (uint64, error) { |
+ // key is now the native key (namespaced). |
+ key = bmc.makeKey(key) |
+ |
+ op := func() (newValue uint64, err error) { |
+ switch { |
+ case delta > 0: |
+ newValue, err = bmc.client.Increment(key, uint64(delta)) |
+ case delta < 0: |
+ newValue, err = bmc.client.Decrement(key, uint64(-delta)) |
+ default: |
+ // We don't want to change the value, but we want to return ErrNotStored |
+ // if the value doesn't exist. Use Get. |
+ _, err = bmc.client.Get(key) |
+ } |
+ err = bmc.translateErr(err) |
+ return |
+ } |
+ |
+ if initialValue == nil { |
+ return op() |
+ } |
+ |
+ // The Memcache service doesn't have an "IncrementExisting" equivalent. We |
iannucci
2016/11/20 19:35:16
Should we move this up to a top-level function? I
dnj
2016/11/20 22:28:47
TBH I think the implementation here is pretty opti
|
+ // will emulate this with other memcache operations, using Add to set the |
+ // initial value if appropriate. |
+ var ( |
+ itm *memcacheItem |
+ iv = *initialValue |
+ ) |
+ for { |
+ // Perform compare-and-swap. |
+ nv, err := op() |
+ if err != mc.ErrCacheMiss { |
+ return nv, err |
+ } |
+ |
+ // The value doesn't exist. Use "Add" to set the initial value. We will |
+ // calculate the "initial value" as if delta were applied so we can do this |
+ // in one operation. |
+ // |
+ // We only need to do this once per invocation, so we will use "itm == nil" |
+ // as a sentinel for uninitialized. |
+ if itm == nil { |
+ // Overflow wraps around (to zero), and underflow is capped at 0. |
+ if delta < 0 { |
+ udelta := uint64(-delta) |
+ if udelta >= iv { |
+ // Would underflow, cap at 0. |
+ iv = 0 |
+ } else { |
+ iv -= udelta |
+ } |
+ } else { |
+ // Apply delta. This will automatically wrap on overflow. |
+ iv += uint64(delta) |
+ } |
+ |
+ itm = bmc.newMemcacheItem(key) |
+ itm.SetValue([]byte(strconv.FormatUint(iv, 10))) |
+ } |
+ switch err := bmc.client.Add(itm.native); err { |
+ case nil: |
+ // Item was successfully set. |
+ return iv, nil |
+ |
+ case mc.ErrNotStored: |
+ // Something else set it in between "op" and "Add". Try "op" again. |
+ break |
+ |
+ default: |
+ return 0, err |
+ } |
+ } |
+} |
+ |
+func (bmc *boundMemcacheClient) Flush() error { |
+ // Unfortunately there's not really a good way to flush just a single |
+ // namespace, so Flush will flush all memcache. |
iannucci
2016/11/20 19:35:16
I'm pretty sure this is the same behavior in prod.
dnj
2016/11/20 22:28:47
Hopefully prod at least flushes per-customer. I en
iannucci
2016/11/23 21:14:37
GAE flushes all of an app's keys (but all gae-name
|
+ return bmc.translateErr(bmc.client.FlushAll()) |
+} |
+ |
+func (bmc *boundMemcacheClient) Stats() (*mc.Statistics, error) { return nil, mc.ErrNoStats } |
+ |
+func (*boundMemcacheClient) translateErr(err error) error { |
+ switch err { |
+ case memcache.ErrCacheMiss: |
+ return mc.ErrCacheMiss |
+ case memcache.ErrCASConflict: |
+ return mc.ErrCASConflict |
+ case memcache.ErrNotStored: |
+ return mc.ErrNotStored |
+ case memcache.ErrServerError: |
+ return mc.ErrServerError |
+ case memcache.ErrNoStats: |
+ return mc.ErrNoStats |
+ default: |
+ return err |
+ } |
+} |