Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1804)

Unified Diff: impl/memory/memcache.go

Issue 1270063003: Make the rest of the services have a similar raw/user interface structure. (Closed) Base URL: https://github.com/luci/gae.git@add_datastore
Patch Set: address comments Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « impl/dummy/dummy_test.go ('k') | impl/memory/memcache_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: impl/memory/memcache.go
diff --git a/impl/memory/memcache.go b/impl/memory/memcache.go
index 99617c0ccf03fb9aa2758e1ce052ae88bbfa2abf..1be03861ce851fdd81cb44a290cb522ead0b91d9 100644
--- a/impl/memory/memcache.go
+++ b/impl/memory/memcache.go
@@ -5,20 +5,20 @@
package memory
import (
+ "encoding/binary"
"sync"
"time"
"golang.org/x/net/context"
- "github.com/luci/gae/impl/dummy"
mc "github.com/luci/gae/service/memcache"
"github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/errors"
)
type mcItem struct {
key string
value []byte
- object interface{}
flags uint32
expiration time.Duration
@@ -29,7 +29,6 @@ var _ mc.Item = (*mcItem)(nil)
func (m *mcItem) Key() string { return m.key }
func (m *mcItem) Value() []byte { return m.value }
-func (m *mcItem) Object() interface{} { return m.object }
func (m *mcItem) Flags() uint32 { return m.flags }
func (m *mcItem) Expiration() time.Duration { return m.expiration }
@@ -41,10 +40,6 @@ func (m *mcItem) SetValue(val []byte) mc.Item {
m.value = val
return m
}
-func (m *mcItem) SetObject(obj interface{}) mc.Item {
- m.object = obj
- return m
-}
func (m *mcItem) SetFlags(flg uint32) mc.Item {
m.flags = flg
return m
@@ -54,30 +49,98 @@ func (m *mcItem) SetExpiration(exp time.Duration) mc.Item {
return m
}
-func (m *mcItem) duplicate() *mcItem {
+func (m *mcItem) SetAll(other mc.Item) {
+ *m = *other.(*mcItem).duplicate(false)
+}
+
+func (m *mcItem) duplicate(deep bool) *mcItem {
ret := mcItem{}
ret = *m
- ret.value = make([]byte, len(m.value))
- copy(ret.value, m.value)
+ if deep {
+ ret.value = make([]byte, len(m.value))
+ copy(ret.value, m.value)
+ }
return &ret
}
type memcacheData struct {
- lock sync.Mutex
+ lock sync.RWMutex
items map[string]*mcItem
casID uint64
+
+ stats mc.Statistics
+}
+
+func (m *memcacheData) mkItemLocked(now time.Time, i mc.Item) (ret *mcItem) {
+ m.casID++
+
+ var exp time.Duration
+ if i.Expiration() != 0 {
+ exp = time.Duration(now.Add(i.Expiration()).UnixNano())
+ }
+ value := make([]byte, len(i.Value()))
+ copy(value, i.Value())
+ return &mcItem{
+ key: i.Key(),
+ flags: i.Flags(),
+ expiration: exp,
+ value: value,
+ CasID: m.casID,
+ }
+}
+
+func (m *memcacheData) setItemLocked(now time.Time, i mc.Item) {
+ if cur, ok := m.items[i.Key()]; ok {
+ m.stats.Items--
+ m.stats.Bytes -= uint64(len(cur.value))
+ }
+ m.stats.Items++
+ m.stats.Bytes += uint64(len(i.Value()))
+ m.items[i.Key()] = m.mkItemLocked(now, i)
+}
+
+func (m *memcacheData) delItemLocked(k string) {
+ if itm, ok := m.items[k]; ok {
+ m.stats.Items--
+ m.stats.Bytes -= uint64(len(itm.value))
+ delete(m.items, k)
+ }
+}
+
+func (m *memcacheData) reset() {
+ m.stats = mc.Statistics{}
+ m.items = map[string]*mcItem{}
+}
+
+func (m *memcacheData) hasItemLocked(now time.Time, key string) bool {
+ ret, ok := m.items[key]
+ if ok && ret.Expiration() != 0 && ret.Expiration() < time.Duration(now.UnixNano()) {
+ m.delItemLocked(key)
+ return false
+ }
+ return ok
+}
+
+func (m *memcacheData) retrieveLocked(now time.Time, key string) (*mcItem, error) {
+ if !m.hasItemLocked(now, key) {
+ m.stats.Misses++
+ return nil, mc.ErrCacheMiss
+ }
+
+ ret := m.items[key]
+ m.stats.Hits++
+ m.stats.ByteHits += uint64(len(ret.value))
+ return ret, nil
}
// memcacheImpl binds the current connection's memcache data to an
// implementation of {gae.Memcache, gae.Testable}.
type memcacheImpl struct {
- mc.Interface
-
data *memcacheData
ctx context.Context
}
-var _ mc.Interface = (*memcacheImpl)(nil)
+var _ mc.RawInterface = (*memcacheImpl)(nil)
// useMC adds a gae.Memcache implementation to context, accessible
// by gae.GetMC(c)
@@ -85,7 +148,7 @@ func useMC(c context.Context) context.Context {
lck := sync.Mutex{}
mcdMap := map[string]*memcacheData{}
- return mc.SetFactory(c, func(ic context.Context) mc.Interface {
+ return mc.SetRawFactory(c, func(ic context.Context) mc.RawInterface {
lck.Lock()
defer lck.Unlock()
@@ -97,106 +160,180 @@ func useMC(c context.Context) context.Context {
}
return &memcacheImpl{
- dummy.Memcache(),
mcd,
ic,
}
})
}
-func (m *memcacheImpl) mkItemLocked(i mc.Item) (ret *mcItem) {
- m.data.casID++
+func (m *memcacheImpl) NewItem(key string) mc.Item {
+ return &mcItem{key: key}
+}
- var exp time.Duration
- if i.Expiration() != 0 {
- exp = time.Duration(clock.Now(m.ctx).Add(i.Expiration()).UnixNano())
+func doCBs(items []mc.Item, cb mc.RawCB, inner func(mc.Item) error) {
+ // This weird construction is so that we:
+ // - don't take the lock for the entire multi operation, since it could imply
+ // false atomicity.
+ // - don't allow cb to block the actual batch operation, since that would
+ // allow binding in ways that aren't possible under the real
+ // implementation (like a recursive deadlock)
+ errs := make([]error, len(items))
+ for i, itm := range items {
+ errs[i] = inner(itm)
}
- newItem := mcItem{
- key: i.Key(),
- flags: i.Flags(),
- expiration: exp,
- value: i.Value(),
- CasID: m.data.casID,
+ for _, e := range errs {
+ cb(e)
}
- return newItem.duplicate()
}
-func (m *memcacheImpl) NewItem(key string) mc.Item {
- return &mcItem{key: key}
+func (m *memcacheImpl) AddMulti(items []mc.Item, cb mc.RawCB) error {
+ now := clock.Now(m.ctx)
+ doCBs(items, cb, func(itm mc.Item) error {
+ m.data.lock.Lock()
+ defer m.data.lock.Unlock()
+ if !m.data.hasItemLocked(now, itm.Key()) {
+ m.data.setItemLocked(now, itm)
+ return nil
+ } else {
+ return (mc.ErrNotStored)
+ }
+ })
+ return nil
}
-// Add implements context.MCSingleReadWriter.Add.
-func (m *memcacheImpl) Add(i mc.Item) error {
- m.data.lock.Lock()
- defer m.data.lock.Unlock()
+func (m *memcacheImpl) CompareAndSwapMulti(items []mc.Item, cb mc.RawCB) error {
+ now := clock.Now(m.ctx)
+ doCBs(items, cb, func(itm mc.Item) error {
+ m.data.lock.Lock()
+ defer m.data.lock.Unlock()
+
+ if cur, err := m.data.retrieveLocked(now, itm.Key()); err == nil {
+ casid := uint64(0)
+ if mi, ok := itm.(*mcItem); ok && mi != nil {
+ casid = mi.CasID
+ }
+
+ if cur.CasID == casid {
+ m.data.setItemLocked(now, itm)
+ } else {
+ return mc.ErrCASConflict
+ }
+ return nil
+ }
+ return mc.ErrNotStored
+ })
+ return nil
+}
- if _, ok := m.retrieveLocked(i.Key()); !ok {
- m.data.items[i.Key()] = m.mkItemLocked(i)
+func (m *memcacheImpl) SetMulti(items []mc.Item, cb mc.RawCB) error {
+ now := clock.Now(m.ctx)
+ doCBs(items, cb, func(itm mc.Item) error {
+ m.data.lock.Lock()
+ defer m.data.lock.Unlock()
+ m.data.setItemLocked(now, itm)
return nil
+ })
+ return nil
+}
+
+func (m *memcacheImpl) GetMulti(keys []string, cb mc.RawItemCB) error {
+ now := clock.Now(m.ctx)
+
+ itms := make([]mc.Item, len(keys))
+ errs := make([]error, len(keys))
+
+ for i, k := range keys {
+ itms[i], errs[i] = func() (mc.Item, error) {
+ m.data.lock.RLock()
+ defer m.data.lock.RUnlock()
+ val, err := m.data.retrieveLocked(now, k)
+ if err != nil {
+ return nil, err
+ }
+ return val.duplicate(true).SetExpiration(0), nil
+ }()
}
- return mc.ErrNotStored
+
+ for i, itm := range itms {
+ cb(itm, errs[i])
+ }
+
+ return nil
}
-// CompareAndSwap implements context.MCSingleReadWriter.CompareAndSwap.
-func (m *memcacheImpl) CompareAndSwap(item mc.Item) error {
- m.data.lock.Lock()
- defer m.data.lock.Unlock()
+func (m *memcacheImpl) DeleteMulti(keys []string, cb mc.RawCB) error {
+ now := clock.Now(m.ctx)
- if cur, ok := m.retrieveLocked(item.Key()); ok {
- casid := uint64(0)
- if mi, ok := item.(*mcItem); ok && mi != nil {
- casid = mi.CasID
- }
+ errs := make([]error, len(keys))
- if cur.CasID == casid {
- m.data.items[item.Key()] = m.mkItemLocked(item)
- } else {
- return mc.ErrCASConflict
- }
- } else {
- return mc.ErrNotStored
+ for i, k := range keys {
+ errs[i] = func() error {
+ m.data.lock.Lock()
+ defer m.data.lock.Unlock()
+ _, err := m.data.retrieveLocked(now, k)
+ if err != nil {
+ return err
+ }
+ m.data.delItemLocked(k)
+ return nil
+ }()
}
+
+ for _, e := range errs {
+ cb(e)
+ }
+
return nil
}
-// Set implements context.MCSingleReadWriter.Set.
-func (m *memcacheImpl) Set(i mc.Item) error {
+func (m *memcacheImpl) Flush() error {
m.data.lock.Lock()
defer m.data.lock.Unlock()
- m.data.items[i.Key()] = m.mkItemLocked(i)
+
+ m.data.reset()
return nil
}
-// Get implements context.MCSingleReadWriter.Get.
-func (m *memcacheImpl) Get(key string) (itm mc.Item, err error) {
+func (m *memcacheImpl) Increment(key string, delta int64, initialValue *uint64) (uint64, error) {
+ now := clock.Now(m.ctx)
+
m.data.lock.Lock()
defer m.data.lock.Unlock()
- if val, ok := m.retrieveLocked(key); ok {
- itm = val.duplicate().SetExpiration(0)
+
+ cur := uint64(0)
+ if initialValue == nil {
+ curItm, err := m.data.retrieveLocked(now, key)
+ if err != nil {
+ return 0, err
+ }
+ if len(curItm.value) != 8 {
+ return 0, errors.New("memcache Increment: got invalid current value")
+ }
+ cur = binary.LittleEndian.Uint64(curItm.value)
} else {
- err = mc.ErrCacheMiss
+ cur = *initialValue
+ }
+ if delta < 0 {
+ if uint64(-delta) > cur {
+ cur = 0
+ } else {
+ cur -= uint64(-delta)
+ }
+ } else {
+ cur += uint64(delta)
}
- return
-}
-// Delete implements context.MCSingleReadWriter.Delete.
-func (m *memcacheImpl) Delete(key string) error {
- m.data.lock.Lock()
- defer m.data.lock.Unlock()
+ newval := make([]byte, 8)
+ binary.LittleEndian.PutUint64(newval, cur)
+ m.data.setItemLocked(now, m.NewItem(key).SetValue(newval))
- if _, ok := m.retrieveLocked(key); ok {
- delete(m.data.items, key)
- return nil
- }
- return mc.ErrCacheMiss
+ return cur, nil
}
-func (m *memcacheImpl) retrieveLocked(key string) (*mcItem, bool) {
- ret, ok := m.data.items[key]
- if ok && ret.Expiration() != 0 && ret.Expiration() < time.Duration(clock.Now(m.ctx).UnixNano()) {
- ret = nil
- ok = false
- delete(m.data.items, key)
- }
- return ret, ok
+func (m *memcacheImpl) Stats() (*mc.Statistics, error) {
+ m.data.lock.RLock()
+ defer m.data.lock.RUnlock()
+
+ ret := m.data.stats
+ return &ret, nil
}
« no previous file with comments | « impl/dummy/dummy_test.go ('k') | impl/memory/memcache_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698