Index: impl/memory/memcache.go |
diff --git a/impl/memory/memcache.go b/impl/memory/memcache.go |
index 99617c0ccf03fb9aa2758e1ce052ae88bbfa2abf..1be03861ce851fdd81cb44a290cb522ead0b91d9 100644 |
--- a/impl/memory/memcache.go |
+++ b/impl/memory/memcache.go |
@@ -5,20 +5,20 @@ |
package memory |
import ( |
+ "encoding/binary" |
"sync" |
"time" |
"golang.org/x/net/context" |
- "github.com/luci/gae/impl/dummy" |
mc "github.com/luci/gae/service/memcache" |
"github.com/luci/luci-go/common/clock" |
+ "github.com/luci/luci-go/common/errors" |
) |
type mcItem struct { |
key string |
value []byte |
- object interface{} |
flags uint32 |
expiration time.Duration |
@@ -29,7 +29,6 @@ var _ mc.Item = (*mcItem)(nil) |
func (m *mcItem) Key() string { return m.key } |
func (m *mcItem) Value() []byte { return m.value } |
-func (m *mcItem) Object() interface{} { return m.object } |
func (m *mcItem) Flags() uint32 { return m.flags } |
func (m *mcItem) Expiration() time.Duration { return m.expiration } |
@@ -41,10 +40,6 @@ func (m *mcItem) SetValue(val []byte) mc.Item { |
m.value = val |
return m |
} |
-func (m *mcItem) SetObject(obj interface{}) mc.Item { |
- m.object = obj |
- return m |
-} |
func (m *mcItem) SetFlags(flg uint32) mc.Item { |
m.flags = flg |
return m |
@@ -54,30 +49,98 @@ func (m *mcItem) SetExpiration(exp time.Duration) mc.Item { |
return m |
} |
-func (m *mcItem) duplicate() *mcItem { |
+func (m *mcItem) SetAll(other mc.Item) { |
+ *m = *other.(*mcItem).duplicate(false) |
+} |
+ |
+func (m *mcItem) duplicate(deep bool) *mcItem { |
ret := mcItem{} |
ret = *m |
- ret.value = make([]byte, len(m.value)) |
- copy(ret.value, m.value) |
+ if deep { |
+ ret.value = make([]byte, len(m.value)) |
+ copy(ret.value, m.value) |
+ } |
return &ret |
} |
type memcacheData struct { |
- lock sync.Mutex |
+ lock sync.RWMutex |
items map[string]*mcItem |
casID uint64 |
+ |
+ stats mc.Statistics |
+} |
+ |
+func (m *memcacheData) mkItemLocked(now time.Time, i mc.Item) (ret *mcItem) { |
+ m.casID++ |
+ |
+ var exp time.Duration |
+ if i.Expiration() != 0 { |
+ exp = time.Duration(now.Add(i.Expiration()).UnixNano()) |
+ } |
+ value := make([]byte, len(i.Value())) |
+ copy(value, i.Value()) |
+ return &mcItem{ |
+ key: i.Key(), |
+ flags: i.Flags(), |
+ expiration: exp, |
+ value: value, |
+ CasID: m.casID, |
+ } |
+} |
+ |
+func (m *memcacheData) setItemLocked(now time.Time, i mc.Item) { |
+ if cur, ok := m.items[i.Key()]; ok { |
+ m.stats.Items-- |
+ m.stats.Bytes -= uint64(len(cur.value)) |
+ } |
+ m.stats.Items++ |
+ m.stats.Bytes += uint64(len(i.Value())) |
+ m.items[i.Key()] = m.mkItemLocked(now, i) |
+} |
+ |
+func (m *memcacheData) delItemLocked(k string) { |
+ if itm, ok := m.items[k]; ok { |
+ m.stats.Items-- |
+ m.stats.Bytes -= uint64(len(itm.value)) |
+ delete(m.items, k) |
+ } |
+} |
+ |
+func (m *memcacheData) reset() { |
+ m.stats = mc.Statistics{} |
+ m.items = map[string]*mcItem{} |
+} |
+ |
+func (m *memcacheData) hasItemLocked(now time.Time, key string) bool { |
+ ret, ok := m.items[key] |
+ if ok && ret.Expiration() != 0 && ret.Expiration() < time.Duration(now.UnixNano()) { |
+ m.delItemLocked(key) |
+ return false |
+ } |
+ return ok |
+} |
+ |
+func (m *memcacheData) retrieveLocked(now time.Time, key string) (*mcItem, error) { |
+ if !m.hasItemLocked(now, key) { |
+ m.stats.Misses++ |
+ return nil, mc.ErrCacheMiss |
+ } |
+ |
+ ret := m.items[key] |
+ m.stats.Hits++ |
+ m.stats.ByteHits += uint64(len(ret.value)) |
+ return ret, nil |
} |
// memcacheImpl binds the current connection's memcache data to an |
// implementation of {gae.Memcache, gae.Testable}. |
type memcacheImpl struct { |
- mc.Interface |
- |
data *memcacheData |
ctx context.Context |
} |
-var _ mc.Interface = (*memcacheImpl)(nil) |
+var _ mc.RawInterface = (*memcacheImpl)(nil) |
// useMC adds a gae.Memcache implementation to context, accessible |
// by gae.GetMC(c) |
@@ -85,7 +148,7 @@ func useMC(c context.Context) context.Context { |
lck := sync.Mutex{} |
mcdMap := map[string]*memcacheData{} |
- return mc.SetFactory(c, func(ic context.Context) mc.Interface { |
+ return mc.SetRawFactory(c, func(ic context.Context) mc.RawInterface { |
lck.Lock() |
defer lck.Unlock() |
@@ -97,106 +160,180 @@ func useMC(c context.Context) context.Context { |
} |
return &memcacheImpl{ |
- dummy.Memcache(), |
mcd, |
ic, |
} |
}) |
} |
-func (m *memcacheImpl) mkItemLocked(i mc.Item) (ret *mcItem) { |
- m.data.casID++ |
+func (m *memcacheImpl) NewItem(key string) mc.Item { |
+ return &mcItem{key: key} |
+} |
- var exp time.Duration |
- if i.Expiration() != 0 { |
- exp = time.Duration(clock.Now(m.ctx).Add(i.Expiration()).UnixNano()) |
+func doCBs(items []mc.Item, cb mc.RawCB, inner func(mc.Item) error) { |
+ // This weird construction is so that we: |
+ // - don't take the lock for the entire multi operation, since it could imply |
+ // false atomicity. |
+ // - don't allow cb to block the actual batch operation, since that would |
+ // allow binding in ways that aren't possible under the real |
+ // implementation (like a recursive deadlock) |
+ errs := make([]error, len(items)) |
+ for i, itm := range items { |
+ errs[i] = inner(itm) |
} |
- newItem := mcItem{ |
- key: i.Key(), |
- flags: i.Flags(), |
- expiration: exp, |
- value: i.Value(), |
- CasID: m.data.casID, |
+ for _, e := range errs { |
+ cb(e) |
} |
- return newItem.duplicate() |
} |
-func (m *memcacheImpl) NewItem(key string) mc.Item { |
- return &mcItem{key: key} |
+func (m *memcacheImpl) AddMulti(items []mc.Item, cb mc.RawCB) error { |
+ now := clock.Now(m.ctx) |
+ doCBs(items, cb, func(itm mc.Item) error { |
+ m.data.lock.Lock() |
+ defer m.data.lock.Unlock() |
+ if !m.data.hasItemLocked(now, itm.Key()) { |
+ m.data.setItemLocked(now, itm) |
+ return nil |
+ } else { |
+ return (mc.ErrNotStored) |
+ } |
+ }) |
+ return nil |
} |
-// Add implements context.MCSingleReadWriter.Add. |
-func (m *memcacheImpl) Add(i mc.Item) error { |
- m.data.lock.Lock() |
- defer m.data.lock.Unlock() |
+func (m *memcacheImpl) CompareAndSwapMulti(items []mc.Item, cb mc.RawCB) error { |
+ now := clock.Now(m.ctx) |
+ doCBs(items, cb, func(itm mc.Item) error { |
+ m.data.lock.Lock() |
+ defer m.data.lock.Unlock() |
+ |
+ if cur, err := m.data.retrieveLocked(now, itm.Key()); err == nil { |
+ casid := uint64(0) |
+ if mi, ok := itm.(*mcItem); ok && mi != nil { |
+ casid = mi.CasID |
+ } |
+ |
+ if cur.CasID == casid { |
+ m.data.setItemLocked(now, itm) |
+ } else { |
+ return mc.ErrCASConflict |
+ } |
+ return nil |
+ } |
+ return mc.ErrNotStored |
+ }) |
+ return nil |
+} |
- if _, ok := m.retrieveLocked(i.Key()); !ok { |
- m.data.items[i.Key()] = m.mkItemLocked(i) |
+func (m *memcacheImpl) SetMulti(items []mc.Item, cb mc.RawCB) error { |
+ now := clock.Now(m.ctx) |
+ doCBs(items, cb, func(itm mc.Item) error { |
+ m.data.lock.Lock() |
+ defer m.data.lock.Unlock() |
+ m.data.setItemLocked(now, itm) |
return nil |
+ }) |
+ return nil |
+} |
+ |
+func (m *memcacheImpl) GetMulti(keys []string, cb mc.RawItemCB) error { |
+ now := clock.Now(m.ctx) |
+ |
+ itms := make([]mc.Item, len(keys)) |
+ errs := make([]error, len(keys)) |
+ |
+ for i, k := range keys { |
+ itms[i], errs[i] = func() (mc.Item, error) { |
+ m.data.lock.RLock() |
+ defer m.data.lock.RUnlock() |
+ val, err := m.data.retrieveLocked(now, k) |
+ if err != nil { |
+ return nil, err |
+ } |
+ return val.duplicate(true).SetExpiration(0), nil |
+ }() |
} |
- return mc.ErrNotStored |
+ |
+ for i, itm := range itms { |
+ cb(itm, errs[i]) |
+ } |
+ |
+ return nil |
} |
-// CompareAndSwap implements context.MCSingleReadWriter.CompareAndSwap. |
-func (m *memcacheImpl) CompareAndSwap(item mc.Item) error { |
- m.data.lock.Lock() |
- defer m.data.lock.Unlock() |
+func (m *memcacheImpl) DeleteMulti(keys []string, cb mc.RawCB) error { |
+ now := clock.Now(m.ctx) |
- if cur, ok := m.retrieveLocked(item.Key()); ok { |
- casid := uint64(0) |
- if mi, ok := item.(*mcItem); ok && mi != nil { |
- casid = mi.CasID |
- } |
+ errs := make([]error, len(keys)) |
- if cur.CasID == casid { |
- m.data.items[item.Key()] = m.mkItemLocked(item) |
- } else { |
- return mc.ErrCASConflict |
- } |
- } else { |
- return mc.ErrNotStored |
+ for i, k := range keys { |
+ errs[i] = func() error { |
+ m.data.lock.Lock() |
+ defer m.data.lock.Unlock() |
+ _, err := m.data.retrieveLocked(now, k) |
+ if err != nil { |
+ return err |
+ } |
+ m.data.delItemLocked(k) |
+ return nil |
+ }() |
} |
+ |
+ for _, e := range errs { |
+ cb(e) |
+ } |
+ |
return nil |
} |
-// Set implements context.MCSingleReadWriter.Set. |
-func (m *memcacheImpl) Set(i mc.Item) error { |
+func (m *memcacheImpl) Flush() error { |
m.data.lock.Lock() |
defer m.data.lock.Unlock() |
- m.data.items[i.Key()] = m.mkItemLocked(i) |
+ |
+ m.data.reset() |
return nil |
} |
-// Get implements context.MCSingleReadWriter.Get. |
-func (m *memcacheImpl) Get(key string) (itm mc.Item, err error) { |
+func (m *memcacheImpl) Increment(key string, delta int64, initialValue *uint64) (uint64, error) { |
+ now := clock.Now(m.ctx) |
+ |
m.data.lock.Lock() |
defer m.data.lock.Unlock() |
- if val, ok := m.retrieveLocked(key); ok { |
- itm = val.duplicate().SetExpiration(0) |
+ |
+ cur := uint64(0) |
+ if initialValue == nil { |
+ curItm, err := m.data.retrieveLocked(now, key) |
+ if err != nil { |
+ return 0, err |
+ } |
+ if len(curItm.value) != 8 { |
+ return 0, errors.New("memcache Increment: got invalid current value") |
+ } |
+ cur = binary.LittleEndian.Uint64(curItm.value) |
} else { |
- err = mc.ErrCacheMiss |
+ cur = *initialValue |
+ } |
+ if delta < 0 { |
+ if uint64(-delta) > cur { |
+ cur = 0 |
+ } else { |
+ cur -= uint64(-delta) |
+ } |
+ } else { |
+ cur += uint64(delta) |
} |
- return |
-} |
-// Delete implements context.MCSingleReadWriter.Delete. |
-func (m *memcacheImpl) Delete(key string) error { |
- m.data.lock.Lock() |
- defer m.data.lock.Unlock() |
+ newval := make([]byte, 8) |
+ binary.LittleEndian.PutUint64(newval, cur) |
+ m.data.setItemLocked(now, m.NewItem(key).SetValue(newval)) |
- if _, ok := m.retrieveLocked(key); ok { |
- delete(m.data.items, key) |
- return nil |
- } |
- return mc.ErrCacheMiss |
+ return cur, nil |
} |
-func (m *memcacheImpl) retrieveLocked(key string) (*mcItem, bool) { |
- ret, ok := m.data.items[key] |
- if ok && ret.Expiration() != 0 && ret.Expiration() < time.Duration(clock.Now(m.ctx).UnixNano()) { |
- ret = nil |
- ok = false |
- delete(m.data.items, key) |
- } |
- return ret, ok |
+func (m *memcacheImpl) Stats() (*mc.Statistics, error) { |
+ m.data.lock.RLock() |
+ defer m.data.lock.RUnlock() |
+ |
+ ret := m.data.stats |
+ return &ret, nil |
} |