Index: impl/memory/memcache.go |
diff --git a/impl/memory/memcache.go b/impl/memory/memcache.go |
index 99617c0ccf03fb9aa2758e1ce052ae88bbfa2abf..6dbd70143d93df069862286bc2ab46d8c3412ab1 100644 |
--- a/impl/memory/memcache.go |
+++ b/impl/memory/memcache.go |
@@ -5,20 +5,20 @@ |
package memory |
import ( |
+ "encoding/binary" |
"sync" |
"time" |
"golang.org/x/net/context" |
- "github.com/luci/gae/impl/dummy" |
mc "github.com/luci/gae/service/memcache" |
"github.com/luci/luci-go/common/clock" |
+ "github.com/luci/luci-go/common/errors" |
) |
type mcItem struct { |
key string |
value []byte |
- object interface{} |
flags uint32 |
expiration time.Duration |
@@ -29,7 +29,6 @@ var _ mc.Item = (*mcItem)(nil) |
func (m *mcItem) Key() string { return m.key } |
func (m *mcItem) Value() []byte { return m.value } |
-func (m *mcItem) Object() interface{} { return m.object } |
func (m *mcItem) Flags() uint32 { return m.flags } |
func (m *mcItem) Expiration() time.Duration { return m.expiration } |
@@ -41,10 +40,6 @@ func (m *mcItem) SetValue(val []byte) mc.Item { |
m.value = val |
return m |
} |
-func (m *mcItem) SetObject(obj interface{}) mc.Item { |
- m.object = obj |
- return m |
-} |
func (m *mcItem) SetFlags(flg uint32) mc.Item { |
m.flags = flg |
return m |
@@ -54,30 +49,97 @@ func (m *mcItem) SetExpiration(exp time.Duration) mc.Item { |
return m |
} |
-func (m *mcItem) duplicate() *mcItem { |
+func (m *mcItem) SetAll(other mc.Item) { |
+ *m = *other.(*mcItem).duplicate(false) |
+} |
+ |
+func (m *mcItem) duplicate(deep bool) *mcItem { |
ret := mcItem{} |
ret = *m |
- ret.value = make([]byte, len(m.value)) |
- copy(ret.value, m.value) |
+ if deep { |
+ ret.value = make([]byte, len(m.value)) |
+ copy(ret.value, m.value) |
+ } |
return &ret |
} |
type memcacheData struct { |
- lock sync.Mutex |
+ lock sync.RWMutex |
items map[string]*mcItem |
casID uint64 |
+ |
+ stats mc.Statistics |
+} |
+ |
+func (m *memcacheData) mkItemLocked(now time.Time, i mc.Item) (ret *mcItem) { |
+ m.casID++ |
+ |
+ var exp time.Duration |
dnj
2015/08/03 22:37:25
Maybe:
exp := i.Expiration()
if exp != 0 {
exp =
iannucci
2015/08/04 01:21:20
https://github.com/golang/appengine/blob/master/me
|
+ if i.Expiration() != 0 { |
+ exp = time.Duration(now.Add(i.Expiration()).UnixNano()) |
+ } |
+ newItem := mcItem{ |
+ key: i.Key(), |
+ flags: i.Flags(), |
+ expiration: exp, |
+ value: i.Value(), |
+ CasID: m.casID, |
+ } |
+ return newItem.duplicate(true) |
dnj
2015/08/03 22:37:25
This seems like a weird way of doing this. Allocat
iannucci
2015/08/04 01:21:20
Done
|
+} |
+ |
+func (m *memcacheData) setItemLocked(now time.Time, i mc.Item) { |
+ if cur, ok := m.items[i.Key()]; ok { |
+ m.stats.Items-- |
+ m.stats.Bytes -= uint64(len(cur.value)) |
+ } |
+ m.stats.Items++ |
+ m.stats.Bytes += uint64(len(i.Value())) |
+ m.items[i.Key()] = m.mkItemLocked(now, i) |
dnj
2015/08/03 22:37:25
Let "m.items" be allowed to be nil and test/init h
iannucci
2015/08/04 01:21:20
Why?
|
+} |
+ |
+func (m *memcacheData) delItemLocked(k string) { |
+ if itm, ok := m.items[k]; ok { |
+ m.stats.Items-- |
+ m.stats.Bytes -= uint64(len(itm.value)) |
+ delete(m.items, k) |
+ } |
+} |
+ |
+func (m *memcacheData) reset() { |
+ m.stats = mc.Statistics{} |
+ m.items = map[string]*mcItem{} |
dnj
2015/08/03 22:37:25
Should just "nil" it here.
iannucci
2015/08/04 01:21:20
Why?
dnj (Google)
2015/08/04 03:48:03
No reason to keep allocated empty maps around.
iannucci
2015/08/04 03:58:33
yeah, but if it doesn't have data in it, then you'
dnj
2015/08/04 18:01:16
Makes sense :)
|
+} |
+ |
+func (m *memcacheData) hasLocked(now time.Time, key string) bool { |
dnj
2015/08/03 22:37:25
I read this as "has locked?". Maybe rename, "hasIt
iannucci
2015/08/04 01:21:20
Done.
|
+ ret, ok := m.items[key] |
+ if ok && ret.Expiration() != 0 && ret.Expiration() < time.Duration(now.UnixNano()) { |
dnj
2015/08/03 22:37:24
Yeah ret.Expiration() really should just be a time
iannucci
2015/08/04 01:21:20
I could do that in another CL. It'll cause more in
dnj (Google)
2015/08/04 03:48:03
Storing nanosecond since epoch in a time.Duration
iannucci
2015/08/04 03:58:33
Yes, that's fine, but the sdk memcache API uses ti
dnj
2015/08/04 18:01:16
Acknowledged.
|
+ m.delItemLocked(key) |
+ return false |
+ } |
+ return ok |
+} |
+ |
+func (m *memcacheData) retrieveLocked(now time.Time, key string) (*mcItem, error) { |
+ if !m.hasLocked(now, key) { |
+ m.stats.Misses++ |
+ return nil, mc.ErrCacheMiss |
+ } |
+ |
+ ret := m.items[key] |
+ m.stats.Hits++ |
+ m.stats.ByteHits += uint64(len(ret.value)) |
+ return ret, nil |
} |
// memcacheImpl binds the current connection's memcache data to an |
// implementation of {gae.Memcache, gae.Testable}. |
type memcacheImpl struct { |
- mc.Interface |
- |
data *memcacheData |
ctx context.Context |
} |
-var _ mc.Interface = (*memcacheImpl)(nil) |
+var _ mc.RawInterface = (*memcacheImpl)(nil) |
// useMC adds a gae.Memcache implementation to context, accessible |
// by gae.GetMC(c) |
@@ -85,7 +147,7 @@ func useMC(c context.Context) context.Context { |
lck := sync.Mutex{} |
mcdMap := map[string]*memcacheData{} |
- return mc.SetFactory(c, func(ic context.Context) mc.Interface { |
+ return mc.SetRawFactory(c, func(ic context.Context) mc.RawInterface { |
lck.Lock() |
defer lck.Unlock() |
@@ -97,106 +159,149 @@ func useMC(c context.Context) context.Context { |
} |
return &memcacheImpl{ |
- dummy.Memcache(), |
mcd, |
ic, |
} |
}) |
} |
-func (m *memcacheImpl) mkItemLocked(i mc.Item) (ret *mcItem) { |
- m.data.casID++ |
- |
- var exp time.Duration |
- if i.Expiration() != 0 { |
- exp = time.Duration(clock.Now(m.ctx).Add(i.Expiration()).UnixNano()) |
- } |
- newItem := mcItem{ |
- key: i.Key(), |
- flags: i.Flags(), |
- expiration: exp, |
- value: i.Value(), |
- CasID: m.data.casID, |
- } |
- return newItem.duplicate() |
-} |
- |
func (m *memcacheImpl) NewItem(key string) mc.Item { |
return &mcItem{key: key} |
} |
-// Add implements context.MCSingleReadWriter.Add. |
-func (m *memcacheImpl) Add(i mc.Item) error { |
+func (m *memcacheImpl) AddMulti(items []mc.Item, cb mc.RawCB) error { |
m.data.lock.Lock() |
defer m.data.lock.Unlock() |
- if _, ok := m.retrieveLocked(i.Key()); !ok { |
- m.data.items[i.Key()] = m.mkItemLocked(i) |
- return nil |
+ for _, itm := range items { |
+ now := clock.Now(m.ctx) |
dnj
2015/08/03 22:37:25
Do this once outside of the lock.
iannucci
2015/08/04 01:21:20
Done.
|
+ if !m.data.hasLocked(now, itm.Key()) { |
+ m.data.setItemLocked(now, itm) |
+ cb(nil) |
+ } else { |
+ cb(mc.ErrNotStored) |
+ } |
} |
- return mc.ErrNotStored |
+ return nil |
} |
-// CompareAndSwap implements context.MCSingleReadWriter.CompareAndSwap. |
-func (m *memcacheImpl) CompareAndSwap(item mc.Item) error { |
+func (m *memcacheImpl) CompareAndSwapMulti(items []mc.Item, cb mc.RawCB) error { |
m.data.lock.Lock() |
defer m.data.lock.Unlock() |
- if cur, ok := m.retrieveLocked(item.Key()); ok { |
- casid := uint64(0) |
- if mi, ok := item.(*mcItem); ok && mi != nil { |
- casid = mi.CasID |
- } |
+ for _, itm := range items { |
+ now := clock.Now(m.ctx) |
dnj
2015/08/03 22:37:25
Do this once outside of the lock.
iannucci
2015/08/04 01:21:20
Done.
|
+ err := error(nil) |
+ cur := (*mcItem)(nil) |
+ if cur, err = m.data.retrieveLocked(now, itm.Key()); err == nil { |
+ casid := uint64(0) |
+ if mi, ok := itm.(*mcItem); ok && mi != nil { |
+ casid = mi.CasID |
+ } |
- if cur.CasID == casid { |
- m.data.items[item.Key()] = m.mkItemLocked(item) |
+ if cur.CasID == casid { |
+ m.data.setItemLocked(now, itm) |
+ } else { |
+ err = mc.ErrCASConflict |
+ } |
} else { |
- return mc.ErrCASConflict |
+ err = mc.ErrNotStored |
} |
- } else { |
- return mc.ErrNotStored |
+ cb(err) |
} |
+ |
return nil |
} |
-// Set implements context.MCSingleReadWriter.Set. |
-func (m *memcacheImpl) Set(i mc.Item) error { |
+func (m *memcacheImpl) SetMulti(items []mc.Item, cb mc.RawCB) error { |
m.data.lock.Lock() |
defer m.data.lock.Unlock() |
- m.data.items[i.Key()] = m.mkItemLocked(i) |
+ for _, itm := range items { |
+ m.data.setItemLocked(clock.Now(m.ctx), itm) |
dnj
2015/08/03 22:37:25
Get "now" once, outside of the lock/loop.
iannucci
2015/08/04 01:21:20
Done.
|
+ cb(nil) |
dnj
2015/08/03 22:37:25
Consider setting the items within the lock, but do
iannucci
2015/08/04 01:21:20
Done.
|
+ } |
+ return nil |
+} |
+ |
+func (m *memcacheImpl) GetMulti(keys []string, cb mc.RawItemCB) error { |
+ m.data.lock.RLock() |
+ defer m.data.lock.RUnlock() |
+ |
+ for _, k := range keys { |
+ itm := (mc.Item)(nil) |
+ val, err := m.data.retrieveLocked(clock.Now(m.ctx), k) |
dnj
2015/08/03 22:37:25
Get "now" once, outside of the lock/loop.
iannucci
2015/08/04 01:21:20
Done.
|
+ if err == nil { |
+ itm = val.duplicate(true).SetExpiration(0) |
+ } |
+ cb(itm, err) |
+ } |
+ |
return nil |
} |
-// Get implements context.MCSingleReadWriter.Get. |
-func (m *memcacheImpl) Get(key string) (itm mc.Item, err error) { |
+func (m *memcacheImpl) DeleteMulti(keys []string, cb mc.RawCB) error { |
m.data.lock.Lock() |
defer m.data.lock.Unlock() |
- if val, ok := m.retrieveLocked(key); ok { |
- itm = val.duplicate().SetExpiration(0) |
- } else { |
- err = mc.ErrCacheMiss |
+ |
+ for _, k := range keys { |
+ _, err := m.data.retrieveLocked(clock.Now(m.ctx), k) |
+ if err == nil { |
+ m.data.delItemLocked(k) |
+ } |
+ cb(err) |
} |
- return |
+ |
+ return nil |
} |
-// Delete implements context.MCSingleReadWriter.Delete. |
-func (m *memcacheImpl) Delete(key string) error { |
+func (m *memcacheImpl) Flush() error { |
m.data.lock.Lock() |
defer m.data.lock.Unlock() |
- if _, ok := m.retrieveLocked(key); ok { |
- delete(m.data.items, key) |
- return nil |
- } |
- return mc.ErrCacheMiss |
+ m.data.reset() |
+ return nil |
} |
-func (m *memcacheImpl) retrieveLocked(key string) (*mcItem, bool) { |
- ret, ok := m.data.items[key] |
- if ok && ret.Expiration() != 0 && ret.Expiration() < time.Duration(clock.Now(m.ctx).UnixNano()) { |
- ret = nil |
- ok = false |
- delete(m.data.items, key) |
+func (m *memcacheImpl) Increment(key string, delta int64, initialValue *uint64) (uint64, error) { |
+ m.data.lock.RLock() |
+ defer m.data.lock.RUnlock() |
+ |
+ now := clock.Now(m.ctx) |
dnj
2015/08/03 22:37:25
Do this outside of the lock.
iannucci
2015/08/04 01:21:20
Done.
|
+ |
+ cur := uint64(0) |
+ if initialValue == nil { |
+ curItm, err := m.data.retrieveLocked(now, key) |
+ if err != nil { |
+ return 0, err |
+ } |
+ if len(curItm.value) != 8 { |
+ return 0, errors.New("memcache Increment: got invalid current value") |
+ } |
+ cur = binary.LittleEndian.Uint64(curItm.value) |
+ } else { |
+ cur = *initialValue |
+ } |
+ if delta < 0 { |
+ if uint64(-delta) > cur { |
+ cur = 0 |
+ } else { |
+ cur -= uint64(-delta) |
+ } |
+ } else { |
+ cur += uint64(delta) |
} |
- return ret, ok |
+ |
+ newval := make([]byte, 8) |
+ binary.LittleEndian.PutUint64(newval, cur) |
+ m.data.setItemLocked(now, m.NewItem(key).SetValue(newval)) |
dnj
2015/08/03 22:37:25
This isn't safe to do with just an RLock.
iannucci
2015/08/04 01:21:20
oops, typo
|
+ |
+ return cur, nil |
+} |
+ |
+func (m *memcacheImpl) Stats() (*mc.Statistics, error) { |
+ m.data.lock.RLock() |
+ defer m.data.lock.RUnlock() |
+ |
+ ret := m.data.stats |
+ return &ret, nil |
} |