Chromium Code Reviews| Index: impl/memory/memcache.go |
| diff --git a/impl/memory/memcache.go b/impl/memory/memcache.go |
| index 99617c0ccf03fb9aa2758e1ce052ae88bbfa2abf..6dbd70143d93df069862286bc2ab46d8c3412ab1 100644 |
| --- a/impl/memory/memcache.go |
| +++ b/impl/memory/memcache.go |
| @@ -5,20 +5,20 @@ |
| package memory |
| import ( |
| + "encoding/binary" |
| "sync" |
| "time" |
| "golang.org/x/net/context" |
| - "github.com/luci/gae/impl/dummy" |
| mc "github.com/luci/gae/service/memcache" |
| "github.com/luci/luci-go/common/clock" |
| + "github.com/luci/luci-go/common/errors" |
| ) |
| type mcItem struct { |
| key string |
| value []byte |
| - object interface{} |
| flags uint32 |
| expiration time.Duration |
| @@ -29,7 +29,6 @@ var _ mc.Item = (*mcItem)(nil) |
| func (m *mcItem) Key() string { return m.key } |
| func (m *mcItem) Value() []byte { return m.value } |
| -func (m *mcItem) Object() interface{} { return m.object } |
| func (m *mcItem) Flags() uint32 { return m.flags } |
| func (m *mcItem) Expiration() time.Duration { return m.expiration } |
| @@ -41,10 +40,6 @@ func (m *mcItem) SetValue(val []byte) mc.Item { |
| m.value = val |
| return m |
| } |
| -func (m *mcItem) SetObject(obj interface{}) mc.Item { |
| - m.object = obj |
| - return m |
| -} |
| func (m *mcItem) SetFlags(flg uint32) mc.Item { |
| m.flags = flg |
| return m |
| @@ -54,30 +49,97 @@ func (m *mcItem) SetExpiration(exp time.Duration) mc.Item { |
| return m |
| } |
| -func (m *mcItem) duplicate() *mcItem { |
| +func (m *mcItem) SetAll(other mc.Item) { |
| + *m = *other.(*mcItem).duplicate(false) |
| +} |
| + |
| +func (m *mcItem) duplicate(deep bool) *mcItem { |
| ret := mcItem{} |
| ret = *m |
| - ret.value = make([]byte, len(m.value)) |
| - copy(ret.value, m.value) |
| + if deep { |
| + ret.value = make([]byte, len(m.value)) |
| + copy(ret.value, m.value) |
| + } |
| return &ret |
| } |
| type memcacheData struct { |
| - lock sync.Mutex |
| + lock sync.RWMutex |
| items map[string]*mcItem |
| casID uint64 |
| + |
| + stats mc.Statistics |
| +} |
| + |
| +func (m *memcacheData) mkItemLocked(now time.Time, i mc.Item) (ret *mcItem) { |
| + m.casID++ |
| + |
| + var exp time.Duration |
|
dnj
2015/08/03 22:37:25
Maybe:
exp := i.Expiration()
if exp != 0 {
exp =
iannucci
2015/08/04 01:21:20
https://github.com/golang/appengine/blob/master/me
|
| + if i.Expiration() != 0 { |
| + exp = time.Duration(now.Add(i.Expiration()).UnixNano()) |
| + } |
| + newItem := mcItem{ |
| + key: i.Key(), |
| + flags: i.Flags(), |
| + expiration: exp, |
| + value: i.Value(), |
| + CasID: m.casID, |
| + } |
| + return newItem.duplicate(true) |
|
dnj
2015/08/03 22:37:25
This seems like a weird way of doing this. Allocat
iannucci
2015/08/04 01:21:20
Done
|
| +} |
| + |
| +func (m *memcacheData) setItemLocked(now time.Time, i mc.Item) { |
| + if cur, ok := m.items[i.Key()]; ok { |
| + m.stats.Items-- |
| + m.stats.Bytes -= uint64(len(cur.value)) |
| + } |
| + m.stats.Items++ |
| + m.stats.Bytes += uint64(len(i.Value())) |
| + m.items[i.Key()] = m.mkItemLocked(now, i) |
|
dnj
2015/08/03 22:37:25
Let "m.items" be allowed to be nil and test/init h
iannucci
2015/08/04 01:21:20
Why?
|
| +} |
| + |
| +func (m *memcacheData) delItemLocked(k string) { |
| + if itm, ok := m.items[k]; ok { |
| + m.stats.Items-- |
| + m.stats.Bytes -= uint64(len(itm.value)) |
| + delete(m.items, k) |
| + } |
| +} |
| + |
| +func (m *memcacheData) reset() { |
| + m.stats = mc.Statistics{} |
| + m.items = map[string]*mcItem{} |
|
dnj
2015/08/03 22:37:25
Should just "nil" it here.
iannucci
2015/08/04 01:21:20
Why?
dnj (Google)
2015/08/04 03:48:03
No reason to keep allocated empty maps around.
iannucci
2015/08/04 03:58:33
yeah, but if it doesn't have data in it, then you'
dnj
2015/08/04 18:01:16
Makes sense :)
|
| +} |
| + |
| +func (m *memcacheData) hasLocked(now time.Time, key string) bool { |
|
dnj
2015/08/03 22:37:25
I read this as "has locked?". Maybe rename, "hasIt
iannucci
2015/08/04 01:21:20
Done.
|
| + ret, ok := m.items[key] |
| + if ok && ret.Expiration() != 0 && ret.Expiration() < time.Duration(now.UnixNano()) { |
|
dnj
2015/08/03 22:37:24
Yeah ret.Expiration() really should just be a time
iannucci
2015/08/04 01:21:20
I could do that in another CL. It'll cause more in
dnj (Google)
2015/08/04 03:48:03
Storing nanosecond since epoch in a time.Duration
iannucci
2015/08/04 03:58:33
Yes, that's fine, but the sdk memcache API uses ti
dnj
2015/08/04 18:01:16
Acknowledged.
|
| + m.delItemLocked(key) |
| + return false |
| + } |
| + return ok |
| +} |
| + |
| +func (m *memcacheData) retrieveLocked(now time.Time, key string) (*mcItem, error) { |
| + if !m.hasLocked(now, key) { |
| + m.stats.Misses++ |
| + return nil, mc.ErrCacheMiss |
| + } |
| + |
| + ret := m.items[key] |
| + m.stats.Hits++ |
| + m.stats.ByteHits += uint64(len(ret.value)) |
| + return ret, nil |
| } |
| // memcacheImpl binds the current connection's memcache data to an |
| // implementation of {gae.Memcache, gae.Testable}. |
| type memcacheImpl struct { |
| - mc.Interface |
| - |
| data *memcacheData |
| ctx context.Context |
| } |
| -var _ mc.Interface = (*memcacheImpl)(nil) |
| +var _ mc.RawInterface = (*memcacheImpl)(nil) |
| // useMC adds a gae.Memcache implementation to context, accessible |
| // by gae.GetMC(c) |
| @@ -85,7 +147,7 @@ func useMC(c context.Context) context.Context { |
| lck := sync.Mutex{} |
| mcdMap := map[string]*memcacheData{} |
| - return mc.SetFactory(c, func(ic context.Context) mc.Interface { |
| + return mc.SetRawFactory(c, func(ic context.Context) mc.RawInterface { |
| lck.Lock() |
| defer lck.Unlock() |
| @@ -97,106 +159,149 @@ func useMC(c context.Context) context.Context { |
| } |
| return &memcacheImpl{ |
| - dummy.Memcache(), |
| mcd, |
| ic, |
| } |
| }) |
| } |
| -func (m *memcacheImpl) mkItemLocked(i mc.Item) (ret *mcItem) { |
| - m.data.casID++ |
| - |
| - var exp time.Duration |
| - if i.Expiration() != 0 { |
| - exp = time.Duration(clock.Now(m.ctx).Add(i.Expiration()).UnixNano()) |
| - } |
| - newItem := mcItem{ |
| - key: i.Key(), |
| - flags: i.Flags(), |
| - expiration: exp, |
| - value: i.Value(), |
| - CasID: m.data.casID, |
| - } |
| - return newItem.duplicate() |
| -} |
| - |
| func (m *memcacheImpl) NewItem(key string) mc.Item { |
| return &mcItem{key: key} |
| } |
| -// Add implements context.MCSingleReadWriter.Add. |
| -func (m *memcacheImpl) Add(i mc.Item) error { |
| +func (m *memcacheImpl) AddMulti(items []mc.Item, cb mc.RawCB) error { |
| m.data.lock.Lock() |
| defer m.data.lock.Unlock() |
| - if _, ok := m.retrieveLocked(i.Key()); !ok { |
| - m.data.items[i.Key()] = m.mkItemLocked(i) |
| - return nil |
| + for _, itm := range items { |
| + now := clock.Now(m.ctx) |
|
dnj
2015/08/03 22:37:25
Do this once outside of the lock.
iannucci
2015/08/04 01:21:20
Done.
|
| + if !m.data.hasLocked(now, itm.Key()) { |
| + m.data.setItemLocked(now, itm) |
| + cb(nil) |
| + } else { |
| + cb(mc.ErrNotStored) |
| + } |
| } |
| - return mc.ErrNotStored |
| + return nil |
| } |
| -// CompareAndSwap implements context.MCSingleReadWriter.CompareAndSwap. |
| -func (m *memcacheImpl) CompareAndSwap(item mc.Item) error { |
| +func (m *memcacheImpl) CompareAndSwapMulti(items []mc.Item, cb mc.RawCB) error { |
| m.data.lock.Lock() |
| defer m.data.lock.Unlock() |
| - if cur, ok := m.retrieveLocked(item.Key()); ok { |
| - casid := uint64(0) |
| - if mi, ok := item.(*mcItem); ok && mi != nil { |
| - casid = mi.CasID |
| - } |
| + for _, itm := range items { |
| + now := clock.Now(m.ctx) |
|
dnj
2015/08/03 22:37:25
Do this once outside of the lock.
iannucci
2015/08/04 01:21:20
Done.
|
| + err := error(nil) |
| + cur := (*mcItem)(nil) |
| + if cur, err = m.data.retrieveLocked(now, itm.Key()); err == nil { |
| + casid := uint64(0) |
| + if mi, ok := itm.(*mcItem); ok && mi != nil { |
| + casid = mi.CasID |
| + } |
| - if cur.CasID == casid { |
| - m.data.items[item.Key()] = m.mkItemLocked(item) |
| + if cur.CasID == casid { |
| + m.data.setItemLocked(now, itm) |
| + } else { |
| + err = mc.ErrCASConflict |
| + } |
| } else { |
| - return mc.ErrCASConflict |
| + err = mc.ErrNotStored |
| } |
| - } else { |
| - return mc.ErrNotStored |
| + cb(err) |
| } |
| + |
| return nil |
| } |
| -// Set implements context.MCSingleReadWriter.Set. |
| -func (m *memcacheImpl) Set(i mc.Item) error { |
| +func (m *memcacheImpl) SetMulti(items []mc.Item, cb mc.RawCB) error { |
| m.data.lock.Lock() |
| defer m.data.lock.Unlock() |
| - m.data.items[i.Key()] = m.mkItemLocked(i) |
| + for _, itm := range items { |
| + m.data.setItemLocked(clock.Now(m.ctx), itm) |
|
dnj
2015/08/03 22:37:25
Get "now" once, outside of the lock/loop.
iannucci
2015/08/04 01:21:20
Done.
|
| + cb(nil) |
|
dnj
2015/08/03 22:37:25
Consider setting the items within the lock, but do
iannucci
2015/08/04 01:21:20
Done.
|
| + } |
| + return nil |
| +} |
| + |
| +func (m *memcacheImpl) GetMulti(keys []string, cb mc.RawItemCB) error { |
| + m.data.lock.RLock() |
| + defer m.data.lock.RUnlock() |
| + |
| + for _, k := range keys { |
| + itm := (mc.Item)(nil) |
| + val, err := m.data.retrieveLocked(clock.Now(m.ctx), k) |
|
dnj
2015/08/03 22:37:25
Get "now" once, outside of the lock/loop.
iannucci
2015/08/04 01:21:20
Done.
|
| + if err == nil { |
| + itm = val.duplicate(true).SetExpiration(0) |
| + } |
| + cb(itm, err) |
| + } |
| + |
| return nil |
| } |
| -// Get implements context.MCSingleReadWriter.Get. |
| -func (m *memcacheImpl) Get(key string) (itm mc.Item, err error) { |
| +func (m *memcacheImpl) DeleteMulti(keys []string, cb mc.RawCB) error { |
| m.data.lock.Lock() |
| defer m.data.lock.Unlock() |
| - if val, ok := m.retrieveLocked(key); ok { |
| - itm = val.duplicate().SetExpiration(0) |
| - } else { |
| - err = mc.ErrCacheMiss |
| + |
| + for _, k := range keys { |
| + _, err := m.data.retrieveLocked(clock.Now(m.ctx), k) |
| + if err == nil { |
| + m.data.delItemLocked(k) |
| + } |
| + cb(err) |
| } |
| - return |
| + |
| + return nil |
| } |
| -// Delete implements context.MCSingleReadWriter.Delete. |
| -func (m *memcacheImpl) Delete(key string) error { |
| +func (m *memcacheImpl) Flush() error { |
| m.data.lock.Lock() |
| defer m.data.lock.Unlock() |
| - if _, ok := m.retrieveLocked(key); ok { |
| - delete(m.data.items, key) |
| - return nil |
| - } |
| - return mc.ErrCacheMiss |
| + m.data.reset() |
| + return nil |
| } |
| -func (m *memcacheImpl) retrieveLocked(key string) (*mcItem, bool) { |
| - ret, ok := m.data.items[key] |
| - if ok && ret.Expiration() != 0 && ret.Expiration() < time.Duration(clock.Now(m.ctx).UnixNano()) { |
| - ret = nil |
| - ok = false |
| - delete(m.data.items, key) |
| +func (m *memcacheImpl) Increment(key string, delta int64, initialValue *uint64) (uint64, error) { |
| + m.data.lock.RLock() |
| + defer m.data.lock.RUnlock() |
| + |
| + now := clock.Now(m.ctx) |
|
dnj
2015/08/03 22:37:25
Do this outside of the lock.
iannucci
2015/08/04 01:21:20
Done.
|
| + |
| + cur := uint64(0) |
| + if initialValue == nil { |
| + curItm, err := m.data.retrieveLocked(now, key) |
| + if err != nil { |
| + return 0, err |
| + } |
| + if len(curItm.value) != 8 { |
| + return 0, errors.New("memcache Increment: got invalid current value") |
| + } |
| + cur = binary.LittleEndian.Uint64(curItm.value) |
| + } else { |
| + cur = *initialValue |
| + } |
| + if delta < 0 { |
| + if uint64(-delta) > cur { |
| + cur = 0 |
| + } else { |
| + cur -= uint64(-delta) |
| + } |
| + } else { |
| + cur += uint64(delta) |
| } |
| - return ret, ok |
| + |
| + newval := make([]byte, 8) |
| + binary.LittleEndian.PutUint64(newval, cur) |
| + m.data.setItemLocked(now, m.NewItem(key).SetValue(newval)) |
|
dnj
2015/08/03 22:37:25
This isn't safe to do with just an RLock.
iannucci
2015/08/04 01:21:20
oops, typo
|
| + |
| + return cur, nil |
| +} |
| + |
| +func (m *memcacheImpl) Stats() (*mc.Statistics, error) { |
| + m.data.lock.RLock() |
| + defer m.data.lock.RUnlock() |
| + |
| + ret := m.data.stats |
| + return &ret, nil |
| } |