OLD | NEW |
(Empty) | |
| 1 // Copyright 2016 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. |
| 4 |
| 5 package cloud |
| 6 |
| 7 import ( |
| 8 "crypto/sha256" |
| 9 "encoding/hex" |
| 10 "strconv" |
| 11 "time" |
| 12 |
| 13 "github.com/luci/gae/service/info" |
| 14 mc "github.com/luci/gae/service/memcache" |
| 15 |
| 16 "github.com/bradfitz/gomemcache/memcache" |
| 17 "golang.org/x/net/context" |
| 18 ) |
| 19 |
| 20 const ( |
| 21 // memcacheKeyPrefix is the common prefix prepended to memcached keys cr
eated |
| 22 // by this package. It is intended to ensure that keys do not conflict w
ith |
| 23 // other users of the service. |
| 24 memcacheKeyPrefix = "github.com/luci/gae/impl/cloud:" |
| 25 |
| 26 // keyHashSizeThreshold is a threshold for key hashing. If the key's len
gth |
| 27 // exceeds this threshold, the key will be hashed and the hash used in i
ts |
| 28 // place with the memcache service. |
| 29 // |
| 30 // This is an implementation detail, but will be visible to the user in
the |
| 31 // Key field of the memcache entry on retrieval. |
| 32 keyHashSizeThreshold = 250 |
| 33 ) |
| 34 |
| 35 // memcacheClient is a "service/memcache" implementation built on top of a |
| 36 // "memcached" client connection. |
| 37 // |
| 38 // Because "memcached" has no concept of a namespace, we differentiate memcache |
| 39 // entries by prepending "memcacheKeyPrefix:SHA256(namespace):" to each key. |
| 40 type memcacheClient struct { |
| 41 client *memcache.Client |
| 42 } |
| 43 |
| 44 func (m *memcacheClient) use(c context.Context) context.Context { |
| 45 return mc.SetRawFactory(c, func(ic context.Context) mc.RawInterface { |
| 46 return bindMemcacheClient(m, info.GetNamespace(ic)) |
| 47 }) |
| 48 } |
| 49 |
| 50 type memcacheItem struct { |
| 51 native *memcache.Item |
| 52 } |
| 53 |
| 54 func (it *memcacheItem) Key() string { return it.native.Key } |
| 55 func (it *memcacheItem) Value() []byte { return it.native.Value } |
| 56 func (it *memcacheItem) Flags() uint32 { return it.native.Flags } |
| 57 func (it *memcacheItem) Expiration() time.Duration { |
| 58 return time.Duration(it.native.Expiration) * time.Second |
| 59 } |
| 60 |
| 61 func (it *memcacheItem) SetKey(v string) mc.Item { |
| 62 it.native.Key = v |
| 63 return it |
| 64 } |
| 65 |
| 66 func (it *memcacheItem) SetValue(v []byte) mc.Item { |
| 67 it.native.Value = v |
| 68 return it |
| 69 } |
| 70 |
| 71 func (it *memcacheItem) SetFlags(v uint32) mc.Item { |
| 72 it.native.Flags = v |
| 73 return it |
| 74 } |
| 75 |
| 76 func (it *memcacheItem) SetExpiration(v time.Duration) mc.Item { |
| 77 it.native.Expiration = int32(v.Seconds()) |
| 78 return it |
| 79 } |
| 80 |
| 81 func (it *memcacheItem) SetAll(other mc.Item) { |
| 82 origKey := it.native.Key |
| 83 |
| 84 var on memcache.Item |
| 85 if other != nil { |
| 86 on = *(other.(*memcacheItem).native) |
| 87 } |
| 88 it.native = &on |
| 89 it.native.Key = origKey |
| 90 } |
| 91 |
| 92 func hashBytes(b []byte) string { |
| 93 hash := sha256.Sum256(b) |
| 94 return hex.EncodeToString(hash[:]) |
| 95 } |
| 96 |
| 97 type boundMemcacheClient struct { |
| 98 *memcacheClient |
| 99 keyPrefix string |
| 100 } |
| 101 |
| 102 func bindMemcacheClient(mc *memcacheClient, ns string) *boundMemcacheClient { |
| 103 nsPrefix := hashBytes([]byte(ns)) |
| 104 return &boundMemcacheClient{ |
| 105 memcacheClient: mc, |
| 106 keyPrefix: memcacheKeyPrefix + nsPrefix + ":", |
| 107 } |
| 108 } |
| 109 |
| 110 func (*boundMemcacheClient) newMemcacheItem(nativeKey string) *memcacheItem { |
| 111 return &memcacheItem{ |
| 112 native: &memcache.Item{ |
| 113 Key: nativeKey, |
| 114 }, |
| 115 } |
| 116 } |
| 117 |
| 118 // makeKey constructs the actual key used with the memcache service. This |
| 119 // includes a service-specific prefix, the key's namespace, and the key itself. |
| 120 // If the key's length exceeds the keyHashSizeThreshold, the key will be stored |
| 121 // as a hash. |
| 122 func (bmc *boundMemcacheClient) makeKey(base string) string { |
| 123 if len(base) > keyHashSizeThreshold { |
| 124 base = hashBytes([]byte(base)) |
| 125 } |
| 126 return bmc.keyPrefix + base |
| 127 } |
| 128 |
| 129 func (bmc *boundMemcacheClient) userKey(key string) string { return key[len(bmc.
keyPrefix):] } |
| 130 |
| 131 func (bmc *boundMemcacheClient) nativeItem(itm mc.Item) *memcache.Item { |
| 132 ni := *(itm.(*memcacheItem).native) |
| 133 ni.Key = bmc.makeKey(ni.Key) |
| 134 return &ni |
| 135 } |
| 136 |
| 137 func (bmc *boundMemcacheClient) NewItem(key string) mc.Item { return bmc.newMemc
acheItem(key) } |
| 138 |
| 139 func (bmc *boundMemcacheClient) AddMulti(items []mc.Item, cb mc.RawCB) error { |
| 140 for _, itm := range items { |
| 141 err := bmc.client.Add(bmc.nativeItem(itm)) |
| 142 cb(bmc.translateErr(err)) |
| 143 } |
| 144 return nil |
| 145 } |
| 146 |
| 147 func (bmc *boundMemcacheClient) SetMulti(items []mc.Item, cb mc.RawCB) error { |
| 148 for _, itm := range items { |
| 149 err := bmc.client.Set(bmc.nativeItem(itm)) |
| 150 cb(bmc.translateErr(err)) |
| 151 } |
| 152 return nil |
| 153 } |
| 154 |
| 155 func (bmc *boundMemcacheClient) GetMulti(keys []string, cb mc.RawItemCB) error { |
| 156 nativeKeys := make([]string, len(keys)) |
| 157 for i, key := range keys { |
| 158 nativeKeys[i] = bmc.makeKey(key) |
| 159 } |
| 160 |
| 161 itemMap, err := bmc.client.GetMulti(nativeKeys) |
| 162 if err != nil { |
| 163 return bmc.translateErr(err) |
| 164 } |
| 165 |
| 166 // Translate the item keys back to user keys. |
| 167 for _, v := range itemMap { |
| 168 v.Key = bmc.userKey(v.Key) |
| 169 } |
| 170 |
| 171 for _, k := range nativeKeys { |
| 172 if it := itemMap[k]; it != nil { |
| 173 cb(&memcacheItem{native: it}, nil) |
| 174 } else { |
| 175 cb(nil, mc.ErrCacheMiss) |
| 176 } |
| 177 } |
| 178 return nil |
| 179 } |
| 180 |
| 181 func (bmc *boundMemcacheClient) DeleteMulti(keys []string, cb mc.RawCB) error { |
| 182 for _, k := range keys { |
| 183 err := bmc.client.Delete(bmc.makeKey(k)) |
| 184 cb(bmc.translateErr(err)) |
| 185 } |
| 186 return nil |
| 187 } |
| 188 |
| 189 func (bmc *boundMemcacheClient) CompareAndSwapMulti(items []mc.Item, cb mc.RawCB
) error { |
| 190 for _, itm := range items { |
| 191 err := bmc.client.CompareAndSwap(bmc.nativeItem(itm)) |
| 192 cb(bmc.translateErr(err)) |
| 193 } |
| 194 return nil |
| 195 } |
| 196 |
| 197 func (bmc *boundMemcacheClient) Increment(key string, delta int64, initialValue
*uint64) (uint64, error) { |
| 198 // key is now the native key (namespaced). |
| 199 key = bmc.makeKey(key) |
| 200 |
| 201 op := func() (newValue uint64, err error) { |
| 202 switch { |
| 203 case delta > 0: |
| 204 newValue, err = bmc.client.Increment(key, uint64(delta)) |
| 205 case delta < 0: |
| 206 newValue, err = bmc.client.Decrement(key, uint64(-delta)
) |
| 207 default: |
| 208 // We don't want to change the value, but we want to ret
urn ErrNotStored |
| 209 // if the value doesn't exist. Use Get. |
| 210 _, err = bmc.client.Get(key) |
| 211 } |
| 212 err = bmc.translateErr(err) |
| 213 return |
| 214 } |
| 215 |
| 216 if initialValue == nil { |
| 217 return op() |
| 218 } |
| 219 |
| 220 // The Memcache service doesn't have an "IncrementExisting" equivalent.
We |
| 221 // will emulate this with other memcache operations, using Add to set th
e |
| 222 // initial value if appropriate. |
| 223 var ( |
| 224 itm *memcacheItem |
| 225 iv = *initialValue |
| 226 ) |
| 227 for { |
| 228 // Perform compare-and-swap. |
| 229 nv, err := op() |
| 230 if err != mc.ErrCacheMiss { |
| 231 return nv, err |
| 232 } |
| 233 |
| 234 // The value doesn't exist. Use "Add" to set the initial value.
We will |
| 235 // calculate the "initial value" as if delta were applied so we
can do this |
| 236 // in one operation. |
| 237 // |
| 238 // We only need to do this once per invocation, so we will use "
itm == nil" |
| 239 // as a sentinel for uninitialized. |
| 240 if itm == nil { |
| 241 // Overflow wraps around (to zero), and underflow is cap
ped at 0. |
| 242 if delta < 0 { |
| 243 udelta := uint64(-delta) |
| 244 if udelta >= iv { |
| 245 // Would underflow, cap at 0. |
| 246 iv = 0 |
| 247 } else { |
| 248 iv -= udelta |
| 249 } |
| 250 } else { |
| 251 // Apply delta. This will automatically wrap on
overflow. |
| 252 iv += uint64(delta) |
| 253 } |
| 254 |
| 255 itm = bmc.newMemcacheItem(key) |
| 256 itm.SetValue([]byte(strconv.FormatUint(iv, 10))) |
| 257 } |
| 258 switch err := bmc.client.Add(itm.native); err { |
| 259 case nil: |
| 260 // Item was successfully set. |
| 261 return iv, nil |
| 262 |
| 263 case mc.ErrNotStored: |
| 264 // Something else set it in between "op" and "Add". Try
"op" again. |
| 265 break |
| 266 |
| 267 default: |
| 268 return 0, err |
| 269 } |
| 270 } |
| 271 } |
| 272 |
| 273 func (bmc *boundMemcacheClient) Flush() error { |
| 274 // Unfortunately there's not really a good way to flush just a single |
| 275 // namespace, so Flush will flush all memcache. |
| 276 return bmc.translateErr(bmc.client.FlushAll()) |
| 277 } |
| 278 |
| 279 func (bmc *boundMemcacheClient) Stats() (*mc.Statistics, error) { return nil, mc
.ErrNoStats } |
| 280 |
| 281 func (*boundMemcacheClient) translateErr(err error) error { |
| 282 switch err { |
| 283 case memcache.ErrCacheMiss: |
| 284 return mc.ErrCacheMiss |
| 285 case memcache.ErrCASConflict: |
| 286 return mc.ErrCASConflict |
| 287 case memcache.ErrNotStored: |
| 288 return mc.ErrNotStored |
| 289 case memcache.ErrServerError: |
| 290 return mc.ErrServerError |
| 291 case memcache.ErrNoStats: |
| 292 return mc.ErrNoStats |
| 293 default: |
| 294 return err |
| 295 } |
| 296 } |
OLD | NEW |