Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(21)

Side by Side Diff: impl/memory/memcache.go

Issue 1270063003: Make the rest of the services have a similar raw/user interface structure. (Closed) Base URL: https://github.com/luci/gae.git@add_datastore
Patch Set: address comments Created 5 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « impl/dummy/dummy_test.go ('k') | impl/memory/memcache_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package memory 5 package memory
6 6
7 import ( 7 import (
8 "encoding/binary"
8 "sync" 9 "sync"
9 "time" 10 "time"
10 11
11 "golang.org/x/net/context" 12 "golang.org/x/net/context"
12 13
13 "github.com/luci/gae/impl/dummy"
14 mc "github.com/luci/gae/service/memcache" 14 mc "github.com/luci/gae/service/memcache"
15 "github.com/luci/luci-go/common/clock" 15 "github.com/luci/luci-go/common/clock"
16 "github.com/luci/luci-go/common/errors"
16 ) 17 )
17 18
18 type mcItem struct { 19 type mcItem struct {
19 key string 20 key string
20 value []byte 21 value []byte
21 object interface{}
22 flags uint32 22 flags uint32
23 expiration time.Duration 23 expiration time.Duration
24 24
25 CasID uint64 25 CasID uint64
26 } 26 }
27 27
28 var _ mc.Item = (*mcItem)(nil) 28 var _ mc.Item = (*mcItem)(nil)
29 29
30 func (m *mcItem) Key() string { return m.key } 30 func (m *mcItem) Key() string { return m.key }
31 func (m *mcItem) Value() []byte { return m.value } 31 func (m *mcItem) Value() []byte { return m.value }
32 func (m *mcItem) Object() interface{} { return m.object }
33 func (m *mcItem) Flags() uint32 { return m.flags } 32 func (m *mcItem) Flags() uint32 { return m.flags }
34 func (m *mcItem) Expiration() time.Duration { return m.expiration } 33 func (m *mcItem) Expiration() time.Duration { return m.expiration }
35 34
36 func (m *mcItem) SetKey(key string) mc.Item { 35 func (m *mcItem) SetKey(key string) mc.Item {
37 m.key = key 36 m.key = key
38 return m 37 return m
39 } 38 }
40 func (m *mcItem) SetValue(val []byte) mc.Item { 39 func (m *mcItem) SetValue(val []byte) mc.Item {
41 m.value = val 40 m.value = val
42 return m 41 return m
43 } 42 }
44 func (m *mcItem) SetObject(obj interface{}) mc.Item {
45 m.object = obj
46 return m
47 }
48 func (m *mcItem) SetFlags(flg uint32) mc.Item { 43 func (m *mcItem) SetFlags(flg uint32) mc.Item {
49 m.flags = flg 44 m.flags = flg
50 return m 45 return m
51 } 46 }
52 func (m *mcItem) SetExpiration(exp time.Duration) mc.Item { 47 func (m *mcItem) SetExpiration(exp time.Duration) mc.Item {
53 m.expiration = exp 48 m.expiration = exp
54 return m 49 return m
55 } 50 }
56 51
57 func (m *mcItem) duplicate() *mcItem { 52 func (m *mcItem) SetAll(other mc.Item) {
53 » *m = *other.(*mcItem).duplicate(false)
54 }
55
56 func (m *mcItem) duplicate(deep bool) *mcItem {
58 ret := mcItem{} 57 ret := mcItem{}
59 ret = *m 58 ret = *m
60 » ret.value = make([]byte, len(m.value)) 59 » if deep {
61 » copy(ret.value, m.value) 60 » » ret.value = make([]byte, len(m.value))
61 » » copy(ret.value, m.value)
62 » }
62 return &ret 63 return &ret
63 } 64 }
64 65
65 type memcacheData struct { 66 type memcacheData struct {
66 » lock sync.Mutex 67 » lock sync.RWMutex
67 items map[string]*mcItem 68 items map[string]*mcItem
68 casID uint64 69 casID uint64
70
71 stats mc.Statistics
72 }
73
74 func (m *memcacheData) mkItemLocked(now time.Time, i mc.Item) (ret *mcItem) {
75 m.casID++
76
77 var exp time.Duration
78 if i.Expiration() != 0 {
79 exp = time.Duration(now.Add(i.Expiration()).UnixNano())
80 }
81 value := make([]byte, len(i.Value()))
82 copy(value, i.Value())
83 return &mcItem{
84 key: i.Key(),
85 flags: i.Flags(),
86 expiration: exp,
87 value: value,
88 CasID: m.casID,
89 }
90 }
91
92 func (m *memcacheData) setItemLocked(now time.Time, i mc.Item) {
93 if cur, ok := m.items[i.Key()]; ok {
94 m.stats.Items--
95 m.stats.Bytes -= uint64(len(cur.value))
96 }
97 m.stats.Items++
98 m.stats.Bytes += uint64(len(i.Value()))
99 m.items[i.Key()] = m.mkItemLocked(now, i)
100 }
101
102 func (m *memcacheData) delItemLocked(k string) {
103 if itm, ok := m.items[k]; ok {
104 m.stats.Items--
105 m.stats.Bytes -= uint64(len(itm.value))
106 delete(m.items, k)
107 }
108 }
109
110 func (m *memcacheData) reset() {
111 m.stats = mc.Statistics{}
112 m.items = map[string]*mcItem{}
113 }
114
115 func (m *memcacheData) hasItemLocked(now time.Time, key string) bool {
116 ret, ok := m.items[key]
117 if ok && ret.Expiration() != 0 && ret.Expiration() < time.Duration(now.U nixNano()) {
118 m.delItemLocked(key)
119 return false
120 }
121 return ok
122 }
123
124 func (m *memcacheData) retrieveLocked(now time.Time, key string) (*mcItem, error ) {
125 if !m.hasItemLocked(now, key) {
126 m.stats.Misses++
127 return nil, mc.ErrCacheMiss
128 }
129
130 ret := m.items[key]
131 m.stats.Hits++
132 m.stats.ByteHits += uint64(len(ret.value))
133 return ret, nil
69 } 134 }
70 135
71 // memcacheImpl binds the current connection's memcache data to an 136 // memcacheImpl binds the current connection's memcache data to an
72 // implementation of {gae.Memcache, gae.Testable}. 137 // implementation of {gae.Memcache, gae.Testable}.
73 type memcacheImpl struct { 138 type memcacheImpl struct {
74 mc.Interface
75
76 data *memcacheData 139 data *memcacheData
77 ctx context.Context 140 ctx context.Context
78 } 141 }
79 142
80 var _ mc.Interface = (*memcacheImpl)(nil) 143 var _ mc.RawInterface = (*memcacheImpl)(nil)
81 144
82 // useMC adds a gae.Memcache implementation to context, accessible 145 // useMC adds a gae.Memcache implementation to context, accessible
83 // by gae.GetMC(c) 146 // by gae.GetMC(c)
84 func useMC(c context.Context) context.Context { 147 func useMC(c context.Context) context.Context {
85 lck := sync.Mutex{} 148 lck := sync.Mutex{}
86 mcdMap := map[string]*memcacheData{} 149 mcdMap := map[string]*memcacheData{}
87 150
88 » return mc.SetFactory(c, func(ic context.Context) mc.Interface { 151 » return mc.SetRawFactory(c, func(ic context.Context) mc.RawInterface {
89 lck.Lock() 152 lck.Lock()
90 defer lck.Unlock() 153 defer lck.Unlock()
91 154
92 ns := curGID(ic).namespace 155 ns := curGID(ic).namespace
93 mcd, ok := mcdMap[ns] 156 mcd, ok := mcdMap[ns]
94 if !ok { 157 if !ok {
95 mcd = &memcacheData{items: map[string]*mcItem{}} 158 mcd = &memcacheData{items: map[string]*mcItem{}}
96 mcdMap[ns] = mcd 159 mcdMap[ns] = mcd
97 } 160 }
98 161
99 return &memcacheImpl{ 162 return &memcacheImpl{
100 dummy.Memcache(),
101 mcd, 163 mcd,
102 ic, 164 ic,
103 } 165 }
104 }) 166 })
105 } 167 }
106 168
107 func (m *memcacheImpl) mkItemLocked(i mc.Item) (ret *mcItem) {
108 m.data.casID++
109
110 var exp time.Duration
111 if i.Expiration() != 0 {
112 exp = time.Duration(clock.Now(m.ctx).Add(i.Expiration()).UnixNan o())
113 }
114 newItem := mcItem{
115 key: i.Key(),
116 flags: i.Flags(),
117 expiration: exp,
118 value: i.Value(),
119 CasID: m.data.casID,
120 }
121 return newItem.duplicate()
122 }
123
124 func (m *memcacheImpl) NewItem(key string) mc.Item { 169 func (m *memcacheImpl) NewItem(key string) mc.Item {
125 return &mcItem{key: key} 170 return &mcItem{key: key}
126 } 171 }
127 172
128 // Add implements context.MCSingleReadWriter.Add. 173 func doCBs(items []mc.Item, cb mc.RawCB, inner func(mc.Item) error) {
129 func (m *memcacheImpl) Add(i mc.Item) error { 174 » // This weird construction is so that we:
175 » // - don't take the lock for the entire multi operation, since it coul d imply
176 » // false atomicity.
177 » // - don't allow cb to block the actual batch operation, since that wo uld
178 » // allow binding in ways that aren't possible under the real
179 » // implementation (like a recursive deadlock)
180 » errs := make([]error, len(items))
181 » for i, itm := range items {
182 » » errs[i] = inner(itm)
183 » }
184 » for _, e := range errs {
185 » » cb(e)
186 » }
187 }
188
189 func (m *memcacheImpl) AddMulti(items []mc.Item, cb mc.RawCB) error {
190 » now := clock.Now(m.ctx)
191 » doCBs(items, cb, func(itm mc.Item) error {
192 » » m.data.lock.Lock()
193 » » defer m.data.lock.Unlock()
194 » » if !m.data.hasItemLocked(now, itm.Key()) {
195 » » » m.data.setItemLocked(now, itm)
196 » » » return nil
197 » » } else {
198 » » » return (mc.ErrNotStored)
199 » » }
200 » })
201 » return nil
202 }
203
204 func (m *memcacheImpl) CompareAndSwapMulti(items []mc.Item, cb mc.RawCB) error {
205 » now := clock.Now(m.ctx)
206 » doCBs(items, cb, func(itm mc.Item) error {
207 » » m.data.lock.Lock()
208 » » defer m.data.lock.Unlock()
209
210 » » if cur, err := m.data.retrieveLocked(now, itm.Key()); err == nil {
211 » » » casid := uint64(0)
212 » » » if mi, ok := itm.(*mcItem); ok && mi != nil {
213 » » » » casid = mi.CasID
214 » » » }
215
216 » » » if cur.CasID == casid {
217 » » » » m.data.setItemLocked(now, itm)
218 » » » } else {
219 » » » » return mc.ErrCASConflict
220 » » » }
221 » » » return nil
222 » » }
223 » » return mc.ErrNotStored
224 » })
225 » return nil
226 }
227
228 func (m *memcacheImpl) SetMulti(items []mc.Item, cb mc.RawCB) error {
229 » now := clock.Now(m.ctx)
230 » doCBs(items, cb, func(itm mc.Item) error {
231 » » m.data.lock.Lock()
232 » » defer m.data.lock.Unlock()
233 » » m.data.setItemLocked(now, itm)
234 » » return nil
235 » })
236 » return nil
237 }
238
239 func (m *memcacheImpl) GetMulti(keys []string, cb mc.RawItemCB) error {
240 » now := clock.Now(m.ctx)
241
242 » itms := make([]mc.Item, len(keys))
243 » errs := make([]error, len(keys))
244
245 » for i, k := range keys {
246 » » itms[i], errs[i] = func() (mc.Item, error) {
247 » » » m.data.lock.RLock()
248 » » » defer m.data.lock.RUnlock()
249 » » » val, err := m.data.retrieveLocked(now, k)
250 » » » if err != nil {
251 » » » » return nil, err
252 » » » }
253 » » » return val.duplicate(true).SetExpiration(0), nil
254 » » }()
255 » }
256
257 » for i, itm := range itms {
258 » » cb(itm, errs[i])
259 » }
260
261 » return nil
262 }
263
264 func (m *memcacheImpl) DeleteMulti(keys []string, cb mc.RawCB) error {
265 » now := clock.Now(m.ctx)
266
267 » errs := make([]error, len(keys))
268
269 » for i, k := range keys {
270 » » errs[i] = func() error {
271 » » » m.data.lock.Lock()
272 » » » defer m.data.lock.Unlock()
273 » » » _, err := m.data.retrieveLocked(now, k)
274 » » » if err != nil {
275 » » » » return err
276 » » » }
277 » » » m.data.delItemLocked(k)
278 » » » return nil
279 » » }()
280 » }
281
282 » for _, e := range errs {
283 » » cb(e)
284 » }
285
286 » return nil
287 }
288
289 func (m *memcacheImpl) Flush() error {
130 m.data.lock.Lock() 290 m.data.lock.Lock()
131 defer m.data.lock.Unlock() 291 defer m.data.lock.Unlock()
132 292
133 » if _, ok := m.retrieveLocked(i.Key()); !ok { 293 » m.data.reset()
134 » » m.data.items[i.Key()] = m.mkItemLocked(i) 294 » return nil
135 » » return nil
136 » }
137 » return mc.ErrNotStored
138 } 295 }
139 296
140 // CompareAndSwap implements context.MCSingleReadWriter.CompareAndSwap. 297 func (m *memcacheImpl) Increment(key string, delta int64, initialValue *uint64) (uint64, error) {
141 func (m *memcacheImpl) CompareAndSwap(item mc.Item) error { 298 » now := clock.Now(m.ctx)
299
142 m.data.lock.Lock() 300 m.data.lock.Lock()
143 defer m.data.lock.Unlock() 301 defer m.data.lock.Unlock()
144 302
145 » if cur, ok := m.retrieveLocked(item.Key()); ok { 303 » cur := uint64(0)
146 » » casid := uint64(0) 304 » if initialValue == nil {
147 » » if mi, ok := item.(*mcItem); ok && mi != nil { 305 » » curItm, err := m.data.retrieveLocked(now, key)
148 » » » casid = mi.CasID 306 » » if err != nil {
307 » » » return 0, err
149 } 308 }
150 309 » » if len(curItm.value) != 8 {
151 » » if cur.CasID == casid { 310 » » » return 0, errors.New("memcache Increment: got invalid cu rrent value")
152 » » » m.data.items[item.Key()] = m.mkItemLocked(item) 311 » » }
312 » » cur = binary.LittleEndian.Uint64(curItm.value)
313 » } else {
314 » » cur = *initialValue
315 » }
316 » if delta < 0 {
317 » » if uint64(-delta) > cur {
318 » » » cur = 0
153 } else { 319 } else {
154 » » » return mc.ErrCASConflict 320 » » » cur -= uint64(-delta)
155 } 321 }
156 } else { 322 } else {
157 » » return mc.ErrNotStored 323 » » cur += uint64(delta)
158 } 324 }
159 » return nil 325
326 » newval := make([]byte, 8)
327 » binary.LittleEndian.PutUint64(newval, cur)
328 » m.data.setItemLocked(now, m.NewItem(key).SetValue(newval))
329
330 » return cur, nil
160 } 331 }
161 332
162 // Set implements context.MCSingleReadWriter.Set. 333 func (m *memcacheImpl) Stats() (*mc.Statistics, error) {
163 func (m *memcacheImpl) Set(i mc.Item) error { 334 » m.data.lock.RLock()
164 » m.data.lock.Lock() 335 » defer m.data.lock.RUnlock()
165 » defer m.data.lock.Unlock() 336
166 » m.data.items[i.Key()] = m.mkItemLocked(i) 337 » ret := m.data.stats
167 » return nil 338 » return &ret, nil
168 } 339 }
169
170 // Get implements context.MCSingleReadWriter.Get.
171 func (m *memcacheImpl) Get(key string) (itm mc.Item, err error) {
172 m.data.lock.Lock()
173 defer m.data.lock.Unlock()
174 if val, ok := m.retrieveLocked(key); ok {
175 itm = val.duplicate().SetExpiration(0)
176 } else {
177 err = mc.ErrCacheMiss
178 }
179 return
180 }
181
182 // Delete implements context.MCSingleReadWriter.Delete.
183 func (m *memcacheImpl) Delete(key string) error {
184 m.data.lock.Lock()
185 defer m.data.lock.Unlock()
186
187 if _, ok := m.retrieveLocked(key); ok {
188 delete(m.data.items, key)
189 return nil
190 }
191 return mc.ErrCacheMiss
192 }
193
194 func (m *memcacheImpl) retrieveLocked(key string) (*mcItem, bool) {
195 ret, ok := m.data.items[key]
196 if ok && ret.Expiration() != 0 && ret.Expiration() < time.Duration(clock .Now(m.ctx).UnixNano()) {
197 ret = nil
198 ok = false
199 delete(m.data.items, key)
200 }
201 return ret, ok
202 }
OLDNEW
« no previous file with comments | « impl/dummy/dummy_test.go ('k') | impl/memory/memcache_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698