Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2560)

Unified Diff: impl/cloud/memcache.go

Issue 2513253002: impl/cloud: Add support for "memcached" memcache. (Closed)
Patch Set: Add test for key hasing. Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « impl/cloud/datastore_test.go ('k') | impl/cloud/memcache_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: impl/cloud/memcache.go
diff --git a/impl/cloud/memcache.go b/impl/cloud/memcache.go
new file mode 100644
index 0000000000000000000000000000000000000000..15933a89df13e8bbf0299724c9ec3e5b48bcdd47
--- /dev/null
+++ b/impl/cloud/memcache.go
@@ -0,0 +1,296 @@
+// Copyright 2016 The LUCI Authors. All rights reserved.
+// Use of this source code is governed under the Apache License, Version 2.0
+// that can be found in the LICENSE file.
+
+package cloud
+
+import (
+ "crypto/sha256"
+ "encoding/hex"
+ "strconv"
+ "time"
+
+ "github.com/luci/gae/service/info"
+ mc "github.com/luci/gae/service/memcache"
+
+ "github.com/bradfitz/gomemcache/memcache"
+ "golang.org/x/net/context"
+)
+
+const (
+ // memcacheKeyPrefix is the common prefix prepended to memcached keys created
+ // by this package. It is intended to ensure that keys do not conflict with
+ // other users of the service.
+ memcacheKeyPrefix = "github.com/luci/gae/impl/cloud:"
+
+ // keyHashSizeThreshold is a threshold for key hashing. If the key's length
+ // exceeds this threshold, the key will be hashed and the hash used in its
+ // place with the memcache service.
+ //
+ // This is an implementation detail, but will be visible to the user in the
+ // Key field of the memcache entry on retrieval.
+ keyHashSizeThreshold = 250
+)
+
+// memcacheClient is a "service/memcache" implementation built on top of a
+// "memcached" client connection.
+//
+// Because "memcached" has no concept of a namespace, we differentiate memcache
+// entries by prepending "memcacheKeyPrefix:SHA256(namespace):" to each key.
+type memcacheClient struct {
+ client *memcache.Client
+}
+
+func (m *memcacheClient) use(c context.Context) context.Context {
+ return mc.SetRawFactory(c, func(ic context.Context) mc.RawInterface {
+ return bindMemcacheClient(m, info.GetNamespace(ic))
+ })
+}
+
+type memcacheItem struct {
+ native *memcache.Item
+}
+
+func (it *memcacheItem) Key() string { return it.native.Key }
+func (it *memcacheItem) Value() []byte { return it.native.Value }
+func (it *memcacheItem) Flags() uint32 { return it.native.Flags }
+func (it *memcacheItem) Expiration() time.Duration {
+ return time.Duration(it.native.Expiration) * time.Second
+}
+
+func (it *memcacheItem) SetKey(v string) mc.Item {
+ it.native.Key = v
+ return it
+}
+
+func (it *memcacheItem) SetValue(v []byte) mc.Item {
+ it.native.Value = v
+ return it
+}
+
+func (it *memcacheItem) SetFlags(v uint32) mc.Item {
+ it.native.Flags = v
+ return it
+}
+
+func (it *memcacheItem) SetExpiration(v time.Duration) mc.Item {
+ it.native.Expiration = int32(v.Seconds())
+ return it
+}
+
+func (it *memcacheItem) SetAll(other mc.Item) {
+ origKey := it.native.Key
+
+ var on memcache.Item
+ if other != nil {
+ on = *(other.(*memcacheItem).native)
+ }
+ it.native = &on
+ it.native.Key = origKey
+}
+
+func hashBytes(b []byte) string {
+ hash := sha256.Sum256(b)
+ return hex.EncodeToString(hash[:])
+}
+
+type boundMemcacheClient struct {
+ *memcacheClient
+ keyPrefix string
+}
+
+func bindMemcacheClient(mc *memcacheClient, ns string) *boundMemcacheClient {
+ nsPrefix := hashBytes([]byte(ns))
+ return &boundMemcacheClient{
+ memcacheClient: mc,
+ keyPrefix: memcacheKeyPrefix + nsPrefix + ":",
+ }
+}
+
+func (*boundMemcacheClient) newMemcacheItem(nativeKey string) *memcacheItem {
+ return &memcacheItem{
+ native: &memcache.Item{
+ Key: nativeKey,
+ },
+ }
+}
+
+// makeKey constructs the actual key used with the memcache service. This
+// includes a service-specific prefix, the key's namespace, and the key itself.
+// If the key's length exceeds the keyHashSizeThreshold, the key will be stored
+// as a hash.
+func (bmc *boundMemcacheClient) makeKey(base string) string {
+ if len(base) > keyHashSizeThreshold {
+ base = hashBytes([]byte(base))
+ }
+ return bmc.keyPrefix + base
+}
+
+func (bmc *boundMemcacheClient) userKey(key string) string { return key[len(bmc.keyPrefix):] }
+
+func (bmc *boundMemcacheClient) nativeItem(itm mc.Item) *memcache.Item {
+ ni := *(itm.(*memcacheItem).native)
+ ni.Key = bmc.makeKey(ni.Key)
+ return &ni
+}
+
+func (bmc *boundMemcacheClient) NewItem(key string) mc.Item { return bmc.newMemcacheItem(key) }
+
+func (bmc *boundMemcacheClient) AddMulti(items []mc.Item, cb mc.RawCB) error {
+ for _, itm := range items {
+ err := bmc.client.Add(bmc.nativeItem(itm))
+ cb(bmc.translateErr(err))
+ }
+ return nil
+}
+
+func (bmc *boundMemcacheClient) SetMulti(items []mc.Item, cb mc.RawCB) error {
+ for _, itm := range items {
+ err := bmc.client.Set(bmc.nativeItem(itm))
+ cb(bmc.translateErr(err))
+ }
+ return nil
+}
+
+func (bmc *boundMemcacheClient) GetMulti(keys []string, cb mc.RawItemCB) error {
+ nativeKeys := make([]string, len(keys))
+ for i, key := range keys {
+ nativeKeys[i] = bmc.makeKey(key)
+ }
+
+ itemMap, err := bmc.client.GetMulti(nativeKeys)
+ if err != nil {
+ return bmc.translateErr(err)
+ }
+
+ // Translate the item keys back to user keys.
+ for _, v := range itemMap {
+ v.Key = bmc.userKey(v.Key)
+ }
+
+ for _, k := range nativeKeys {
+ if it := itemMap[k]; it != nil {
+ cb(&memcacheItem{native: it}, nil)
+ } else {
+ cb(nil, mc.ErrCacheMiss)
+ }
+ }
+ return nil
+}
+
+func (bmc *boundMemcacheClient) DeleteMulti(keys []string, cb mc.RawCB) error {
+ for _, k := range keys {
+ err := bmc.client.Delete(bmc.makeKey(k))
+ cb(bmc.translateErr(err))
+ }
+ return nil
+}
+
+func (bmc *boundMemcacheClient) CompareAndSwapMulti(items []mc.Item, cb mc.RawCB) error {
+ for _, itm := range items {
+ err := bmc.client.CompareAndSwap(bmc.nativeItem(itm))
+ cb(bmc.translateErr(err))
+ }
+ return nil
+}
+
+func (bmc *boundMemcacheClient) Increment(key string, delta int64, initialValue *uint64) (uint64, error) {
+ // key is now the native key (namespaced).
+ key = bmc.makeKey(key)
+
+ op := func() (newValue uint64, err error) {
+ switch {
+ case delta > 0:
+ newValue, err = bmc.client.Increment(key, uint64(delta))
+ case delta < 0:
+ newValue, err = bmc.client.Decrement(key, uint64(-delta))
+ default:
+ // We don't want to change the value, but we want to return ErrNotStored
+ // if the value doesn't exist. Use Get.
+ _, err = bmc.client.Get(key)
+ }
+ err = bmc.translateErr(err)
+ return
+ }
+
+ if initialValue == nil {
+ return op()
+ }
+
+ // The Memcache service doesn't have an "IncrementExisting" equivalent. We
+ // will emulate this with other memcache operations, using Add to set the
+ // initial value if appropriate.
+ var (
+ itm *memcacheItem
+ iv = *initialValue
+ )
+ for {
+ // Perform compare-and-swap.
+ nv, err := op()
+ if err != mc.ErrCacheMiss {
+ return nv, err
+ }
+
+ // The value doesn't exist. Use "Add" to set the initial value. We will
+ // calculate the "initial value" as if delta were applied so we can do this
+ // in one operation.
+ //
+ // We only need to do this once per invocation, so we will use "itm == nil"
+ // as a sentinel for uninitialized.
+ if itm == nil {
+ // Overflow wraps around (to zero), and underflow is capped at 0.
+ if delta < 0 {
+ udelta := uint64(-delta)
+ if udelta >= iv {
+ // Would underflow, cap at 0.
+ iv = 0
+ } else {
+ iv -= udelta
+ }
+ } else {
+ // Apply delta. This will automatically wrap on overflow.
+ iv += uint64(delta)
+ }
+
+ itm = bmc.newMemcacheItem(key)
+ itm.SetValue([]byte(strconv.FormatUint(iv, 10)))
+ }
+ switch err := bmc.client.Add(itm.native); err {
+ case nil:
+ // Item was successfully set.
+ return iv, nil
+
+ case mc.ErrNotStored:
+ // Something else set it in between "op" and "Add". Try "op" again.
+ break
+
+ default:
+ return 0, err
+ }
+ }
+}
+
+func (bmc *boundMemcacheClient) Flush() error {
+ // Unfortunately there's not really a good way to flush just a single
+ // namespace, so Flush will flush all memcache.
+ return bmc.translateErr(bmc.client.FlushAll())
+}
+
+func (bmc *boundMemcacheClient) Stats() (*mc.Statistics, error) { return nil, mc.ErrNoStats }
+
+func (*boundMemcacheClient) translateErr(err error) error {
+ switch err {
+ case memcache.ErrCacheMiss:
+ return mc.ErrCacheMiss
+ case memcache.ErrCASConflict:
+ return mc.ErrCASConflict
+ case memcache.ErrNotStored:
+ return mc.ErrNotStored
+ case memcache.ErrServerError:
+ return mc.ErrServerError
+ case memcache.ErrNoStats:
+ return mc.ErrNoStats
+ default:
+ return err
+ }
+}
« no previous file with comments | « impl/cloud/datastore_test.go ('k') | impl/cloud/memcache_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698