| Index: impl/memory/memcache.go
|
| diff --git a/impl/memory/memcache.go b/impl/memory/memcache.go
|
| index 99617c0ccf03fb9aa2758e1ce052ae88bbfa2abf..1be03861ce851fdd81cb44a290cb522ead0b91d9 100644
|
| --- a/impl/memory/memcache.go
|
| +++ b/impl/memory/memcache.go
|
| @@ -5,20 +5,20 @@
|
| package memory
|
|
|
| import (
|
| + "encoding/binary"
|
| "sync"
|
| "time"
|
|
|
| "golang.org/x/net/context"
|
|
|
| - "github.com/luci/gae/impl/dummy"
|
| mc "github.com/luci/gae/service/memcache"
|
| "github.com/luci/luci-go/common/clock"
|
| + "github.com/luci/luci-go/common/errors"
|
| )
|
|
|
| type mcItem struct {
|
| key string
|
| value []byte
|
| - object interface{}
|
| flags uint32
|
| expiration time.Duration
|
|
|
| @@ -29,7 +29,6 @@ var _ mc.Item = (*mcItem)(nil)
|
|
|
| func (m *mcItem) Key() string { return m.key }
|
| func (m *mcItem) Value() []byte { return m.value }
|
| -func (m *mcItem) Object() interface{} { return m.object }
|
| func (m *mcItem) Flags() uint32 { return m.flags }
|
| func (m *mcItem) Expiration() time.Duration { return m.expiration }
|
|
|
| @@ -41,10 +40,6 @@ func (m *mcItem) SetValue(val []byte) mc.Item {
|
| m.value = val
|
| return m
|
| }
|
| -func (m *mcItem) SetObject(obj interface{}) mc.Item {
|
| - m.object = obj
|
| - return m
|
| -}
|
| func (m *mcItem) SetFlags(flg uint32) mc.Item {
|
| m.flags = flg
|
| return m
|
| @@ -54,30 +49,98 @@ func (m *mcItem) SetExpiration(exp time.Duration) mc.Item {
|
| return m
|
| }
|
|
|
| -func (m *mcItem) duplicate() *mcItem {
|
| +func (m *mcItem) SetAll(other mc.Item) {
|
| + *m = *other.(*mcItem).duplicate(false)
|
| +}
|
| +
|
| +func (m *mcItem) duplicate(deep bool) *mcItem {
|
| ret := mcItem{}
|
| ret = *m
|
| - ret.value = make([]byte, len(m.value))
|
| - copy(ret.value, m.value)
|
| + if deep {
|
| + ret.value = make([]byte, len(m.value))
|
| + copy(ret.value, m.value)
|
| + }
|
| return &ret
|
| }
|
|
|
| type memcacheData struct {
|
| - lock sync.Mutex
|
| + lock sync.RWMutex
|
| items map[string]*mcItem
|
| casID uint64
|
| +
|
| + stats mc.Statistics
|
| +}
|
| +
|
| +func (m *memcacheData) mkItemLocked(now time.Time, i mc.Item) (ret *mcItem) {
|
| + m.casID++
|
| +
|
| + var exp time.Duration
|
| + if i.Expiration() != 0 {
|
| + exp = time.Duration(now.Add(i.Expiration()).UnixNano())
|
| + }
|
| + value := make([]byte, len(i.Value()))
|
| + copy(value, i.Value())
|
| + return &mcItem{
|
| + key: i.Key(),
|
| + flags: i.Flags(),
|
| + expiration: exp,
|
| + value: value,
|
| + CasID: m.casID,
|
| + }
|
| +}
|
| +
|
| +func (m *memcacheData) setItemLocked(now time.Time, i mc.Item) {
|
| + if cur, ok := m.items[i.Key()]; ok {
|
| + m.stats.Items--
|
| + m.stats.Bytes -= uint64(len(cur.value))
|
| + }
|
| + m.stats.Items++
|
| + m.stats.Bytes += uint64(len(i.Value()))
|
| + m.items[i.Key()] = m.mkItemLocked(now, i)
|
| +}
|
| +
|
| +func (m *memcacheData) delItemLocked(k string) {
|
| + if itm, ok := m.items[k]; ok {
|
| + m.stats.Items--
|
| + m.stats.Bytes -= uint64(len(itm.value))
|
| + delete(m.items, k)
|
| + }
|
| +}
|
| +
|
| +func (m *memcacheData) reset() {
|
| + m.stats = mc.Statistics{}
|
| + m.items = map[string]*mcItem{}
|
| +}
|
| +
|
| +func (m *memcacheData) hasItemLocked(now time.Time, key string) bool {
|
| + ret, ok := m.items[key]
|
| + if ok && ret.Expiration() != 0 && ret.Expiration() < time.Duration(now.UnixNano()) {
|
| + m.delItemLocked(key)
|
| + return false
|
| + }
|
| + return ok
|
| +}
|
| +
|
| +func (m *memcacheData) retrieveLocked(now time.Time, key string) (*mcItem, error) {
|
| + if !m.hasItemLocked(now, key) {
|
| + m.stats.Misses++
|
| + return nil, mc.ErrCacheMiss
|
| + }
|
| +
|
| + ret := m.items[key]
|
| + m.stats.Hits++
|
| + m.stats.ByteHits += uint64(len(ret.value))
|
| + return ret, nil
|
| }
|
|
|
| // memcacheImpl binds the current connection's memcache data to an
|
| // implementation of {gae.Memcache, gae.Testable}.
|
| type memcacheImpl struct {
|
| - mc.Interface
|
| -
|
| data *memcacheData
|
| ctx context.Context
|
| }
|
|
|
| -var _ mc.Interface = (*memcacheImpl)(nil)
|
| +var _ mc.RawInterface = (*memcacheImpl)(nil)
|
|
|
| // useMC adds a gae.Memcache implementation to context, accessible
|
| // by gae.GetMC(c)
|
| @@ -85,7 +148,7 @@ func useMC(c context.Context) context.Context {
|
| lck := sync.Mutex{}
|
| mcdMap := map[string]*memcacheData{}
|
|
|
| - return mc.SetFactory(c, func(ic context.Context) mc.Interface {
|
| + return mc.SetRawFactory(c, func(ic context.Context) mc.RawInterface {
|
| lck.Lock()
|
| defer lck.Unlock()
|
|
|
| @@ -97,106 +160,180 @@ func useMC(c context.Context) context.Context {
|
| }
|
|
|
| return &memcacheImpl{
|
| - dummy.Memcache(),
|
| mcd,
|
| ic,
|
| }
|
| })
|
| }
|
|
|
| -func (m *memcacheImpl) mkItemLocked(i mc.Item) (ret *mcItem) {
|
| - m.data.casID++
|
| +func (m *memcacheImpl) NewItem(key string) mc.Item {
|
| + return &mcItem{key: key}
|
| +}
|
|
|
| - var exp time.Duration
|
| - if i.Expiration() != 0 {
|
| - exp = time.Duration(clock.Now(m.ctx).Add(i.Expiration()).UnixNano())
|
| +func doCBs(items []mc.Item, cb mc.RawCB, inner func(mc.Item) error) {
|
| + // This weird construction is so that we:
|
| + // - don't take the lock for the entire multi operation, since it could imply
|
| + // false atomicity.
|
| + // - don't allow cb to block the actual batch operation, since that would
|
| + // allow binding in ways that aren't possible under the real
|
| + // implementation (like a recursive deadlock)
|
| + errs := make([]error, len(items))
|
| + for i, itm := range items {
|
| + errs[i] = inner(itm)
|
| }
|
| - newItem := mcItem{
|
| - key: i.Key(),
|
| - flags: i.Flags(),
|
| - expiration: exp,
|
| - value: i.Value(),
|
| - CasID: m.data.casID,
|
| + for _, e := range errs {
|
| + cb(e)
|
| }
|
| - return newItem.duplicate()
|
| }
|
|
|
| -func (m *memcacheImpl) NewItem(key string) mc.Item {
|
| - return &mcItem{key: key}
|
| +func (m *memcacheImpl) AddMulti(items []mc.Item, cb mc.RawCB) error {
|
| + now := clock.Now(m.ctx)
|
| + doCBs(items, cb, func(itm mc.Item) error {
|
| + m.data.lock.Lock()
|
| + defer m.data.lock.Unlock()
|
| + if !m.data.hasItemLocked(now, itm.Key()) {
|
| + m.data.setItemLocked(now, itm)
|
| + return nil
|
| + } else {
|
| + return (mc.ErrNotStored)
|
| + }
|
| + })
|
| + return nil
|
| }
|
|
|
| -// Add implements context.MCSingleReadWriter.Add.
|
| -func (m *memcacheImpl) Add(i mc.Item) error {
|
| - m.data.lock.Lock()
|
| - defer m.data.lock.Unlock()
|
| +func (m *memcacheImpl) CompareAndSwapMulti(items []mc.Item, cb mc.RawCB) error {
|
| + now := clock.Now(m.ctx)
|
| + doCBs(items, cb, func(itm mc.Item) error {
|
| + m.data.lock.Lock()
|
| + defer m.data.lock.Unlock()
|
| +
|
| + if cur, err := m.data.retrieveLocked(now, itm.Key()); err == nil {
|
| + casid := uint64(0)
|
| + if mi, ok := itm.(*mcItem); ok && mi != nil {
|
| + casid = mi.CasID
|
| + }
|
| +
|
| + if cur.CasID == casid {
|
| + m.data.setItemLocked(now, itm)
|
| + } else {
|
| + return mc.ErrCASConflict
|
| + }
|
| + return nil
|
| + }
|
| + return mc.ErrNotStored
|
| + })
|
| + return nil
|
| +}
|
|
|
| - if _, ok := m.retrieveLocked(i.Key()); !ok {
|
| - m.data.items[i.Key()] = m.mkItemLocked(i)
|
| +func (m *memcacheImpl) SetMulti(items []mc.Item, cb mc.RawCB) error {
|
| + now := clock.Now(m.ctx)
|
| + doCBs(items, cb, func(itm mc.Item) error {
|
| + m.data.lock.Lock()
|
| + defer m.data.lock.Unlock()
|
| + m.data.setItemLocked(now, itm)
|
| return nil
|
| + })
|
| + return nil
|
| +}
|
| +
|
| +func (m *memcacheImpl) GetMulti(keys []string, cb mc.RawItemCB) error {
|
| + now := clock.Now(m.ctx)
|
| +
|
| + itms := make([]mc.Item, len(keys))
|
| + errs := make([]error, len(keys))
|
| +
|
| + for i, k := range keys {
|
| + itms[i], errs[i] = func() (mc.Item, error) {
|
| + m.data.lock.RLock()
|
| + defer m.data.lock.RUnlock()
|
| + val, err := m.data.retrieveLocked(now, k)
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| + return val.duplicate(true).SetExpiration(0), nil
|
| + }()
|
| }
|
| - return mc.ErrNotStored
|
| +
|
| + for i, itm := range itms {
|
| + cb(itm, errs[i])
|
| + }
|
| +
|
| + return nil
|
| }
|
|
|
| -// CompareAndSwap implements context.MCSingleReadWriter.CompareAndSwap.
|
| -func (m *memcacheImpl) CompareAndSwap(item mc.Item) error {
|
| - m.data.lock.Lock()
|
| - defer m.data.lock.Unlock()
|
| +func (m *memcacheImpl) DeleteMulti(keys []string, cb mc.RawCB) error {
|
| + now := clock.Now(m.ctx)
|
|
|
| - if cur, ok := m.retrieveLocked(item.Key()); ok {
|
| - casid := uint64(0)
|
| - if mi, ok := item.(*mcItem); ok && mi != nil {
|
| - casid = mi.CasID
|
| - }
|
| + errs := make([]error, len(keys))
|
|
|
| - if cur.CasID == casid {
|
| - m.data.items[item.Key()] = m.mkItemLocked(item)
|
| - } else {
|
| - return mc.ErrCASConflict
|
| - }
|
| - } else {
|
| - return mc.ErrNotStored
|
| + for i, k := range keys {
|
| + errs[i] = func() error {
|
| + m.data.lock.Lock()
|
| + defer m.data.lock.Unlock()
|
| + _, err := m.data.retrieveLocked(now, k)
|
| + if err != nil {
|
| + return err
|
| + }
|
| + m.data.delItemLocked(k)
|
| + return nil
|
| + }()
|
| }
|
| +
|
| + for _, e := range errs {
|
| + cb(e)
|
| + }
|
| +
|
| return nil
|
| }
|
|
|
| -// Set implements context.MCSingleReadWriter.Set.
|
| -func (m *memcacheImpl) Set(i mc.Item) error {
|
| +func (m *memcacheImpl) Flush() error {
|
| m.data.lock.Lock()
|
| defer m.data.lock.Unlock()
|
| - m.data.items[i.Key()] = m.mkItemLocked(i)
|
| +
|
| + m.data.reset()
|
| return nil
|
| }
|
|
|
| -// Get implements context.MCSingleReadWriter.Get.
|
| -func (m *memcacheImpl) Get(key string) (itm mc.Item, err error) {
|
| +func (m *memcacheImpl) Increment(key string, delta int64, initialValue *uint64) (uint64, error) {
|
| + now := clock.Now(m.ctx)
|
| +
|
| m.data.lock.Lock()
|
| defer m.data.lock.Unlock()
|
| - if val, ok := m.retrieveLocked(key); ok {
|
| - itm = val.duplicate().SetExpiration(0)
|
| +
|
| + cur := uint64(0)
|
| + if initialValue == nil {
|
| + curItm, err := m.data.retrieveLocked(now, key)
|
| + if err != nil {
|
| + return 0, err
|
| + }
|
| + if len(curItm.value) != 8 {
|
| + return 0, errors.New("memcache Increment: got invalid current value")
|
| + }
|
| + cur = binary.LittleEndian.Uint64(curItm.value)
|
| } else {
|
| - err = mc.ErrCacheMiss
|
| + cur = *initialValue
|
| + }
|
| + if delta < 0 {
|
| + if uint64(-delta) > cur {
|
| + cur = 0
|
| + } else {
|
| + cur -= uint64(-delta)
|
| + }
|
| + } else {
|
| + cur += uint64(delta)
|
| }
|
| - return
|
| -}
|
|
|
| -// Delete implements context.MCSingleReadWriter.Delete.
|
| -func (m *memcacheImpl) Delete(key string) error {
|
| - m.data.lock.Lock()
|
| - defer m.data.lock.Unlock()
|
| + newval := make([]byte, 8)
|
| + binary.LittleEndian.PutUint64(newval, cur)
|
| + m.data.setItemLocked(now, m.NewItem(key).SetValue(newval))
|
|
|
| - if _, ok := m.retrieveLocked(key); ok {
|
| - delete(m.data.items, key)
|
| - return nil
|
| - }
|
| - return mc.ErrCacheMiss
|
| + return cur, nil
|
| }
|
|
|
| -func (m *memcacheImpl) retrieveLocked(key string) (*mcItem, bool) {
|
| - ret, ok := m.data.items[key]
|
| - if ok && ret.Expiration() != 0 && ret.Expiration() < time.Duration(clock.Now(m.ctx).UnixNano()) {
|
| - ret = nil
|
| - ok = false
|
| - delete(m.data.items, key)
|
| - }
|
| - return ret, ok
|
| +func (m *memcacheImpl) Stats() (*mc.Statistics, error) {
|
| + m.data.lock.RLock()
|
| + defer m.data.lock.RUnlock()
|
| +
|
| + ret := m.data.stats
|
| + return &ret, nil
|
| }
|
|
|