OLD | NEW |
---|---|
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
8 "encoding/binary" | |
8 "sync" | 9 "sync" |
9 "time" | 10 "time" |
10 | 11 |
11 "golang.org/x/net/context" | 12 "golang.org/x/net/context" |
12 | 13 |
13 "github.com/luci/gae/impl/dummy" | |
14 mc "github.com/luci/gae/service/memcache" | 14 mc "github.com/luci/gae/service/memcache" |
15 "github.com/luci/luci-go/common/clock" | 15 "github.com/luci/luci-go/common/clock" |
16 "github.com/luci/luci-go/common/errors" | |
16 ) | 17 ) |
17 | 18 |
18 type mcItem struct { | 19 type mcItem struct { |
19 key string | 20 key string |
20 value []byte | 21 value []byte |
21 object interface{} | |
22 flags uint32 | 22 flags uint32 |
23 expiration time.Duration | 23 expiration time.Duration |
24 | 24 |
25 CasID uint64 | 25 CasID uint64 |
26 } | 26 } |
27 | 27 |
28 var _ mc.Item = (*mcItem)(nil) | 28 var _ mc.Item = (*mcItem)(nil) |
29 | 29 |
30 func (m *mcItem) Key() string { return m.key } | 30 func (m *mcItem) Key() string { return m.key } |
31 func (m *mcItem) Value() []byte { return m.value } | 31 func (m *mcItem) Value() []byte { return m.value } |
32 func (m *mcItem) Object() interface{} { return m.object } | |
33 func (m *mcItem) Flags() uint32 { return m.flags } | 32 func (m *mcItem) Flags() uint32 { return m.flags } |
34 func (m *mcItem) Expiration() time.Duration { return m.expiration } | 33 func (m *mcItem) Expiration() time.Duration { return m.expiration } |
35 | 34 |
36 func (m *mcItem) SetKey(key string) mc.Item { | 35 func (m *mcItem) SetKey(key string) mc.Item { |
37 m.key = key | 36 m.key = key |
38 return m | 37 return m |
39 } | 38 } |
40 func (m *mcItem) SetValue(val []byte) mc.Item { | 39 func (m *mcItem) SetValue(val []byte) mc.Item { |
41 m.value = val | 40 m.value = val |
42 return m | 41 return m |
43 } | 42 } |
44 func (m *mcItem) SetObject(obj interface{}) mc.Item { | |
45 m.object = obj | |
46 return m | |
47 } | |
48 func (m *mcItem) SetFlags(flg uint32) mc.Item { | 43 func (m *mcItem) SetFlags(flg uint32) mc.Item { |
49 m.flags = flg | 44 m.flags = flg |
50 return m | 45 return m |
51 } | 46 } |
52 func (m *mcItem) SetExpiration(exp time.Duration) mc.Item { | 47 func (m *mcItem) SetExpiration(exp time.Duration) mc.Item { |
53 m.expiration = exp | 48 m.expiration = exp |
54 return m | 49 return m |
55 } | 50 } |
56 | 51 |
57 func (m *mcItem) duplicate() *mcItem { | 52 func (m *mcItem) SetAll(other mc.Item) { |
53 » *m = *other.(*mcItem).duplicate(false) | |
54 } | |
55 | |
56 func (m *mcItem) duplicate(deep bool) *mcItem { | |
58 ret := mcItem{} | 57 ret := mcItem{} |
59 ret = *m | 58 ret = *m |
60 » ret.value = make([]byte, len(m.value)) | 59 » if deep { |
61 » copy(ret.value, m.value) | 60 » » ret.value = make([]byte, len(m.value)) |
61 » » copy(ret.value, m.value) | |
62 » } | |
62 return &ret | 63 return &ret |
63 } | 64 } |
64 | 65 |
65 type memcacheData struct { | 66 type memcacheData struct { |
66 » lock sync.Mutex | 67 » lock sync.RWMutex |
67 items map[string]*mcItem | 68 items map[string]*mcItem |
68 casID uint64 | 69 casID uint64 |
70 | |
71 stats mc.Statistics | |
72 } | |
73 | |
74 func (m *memcacheData) mkItemLocked(now time.Time, i mc.Item) (ret *mcItem) { | |
75 m.casID++ | |
76 | |
77 var exp time.Duration | |
dnj
2015/08/03 22:37:25
Maybe:
exp := i.Expiration()
if exp != 0 {
exp =
iannucci
2015/08/04 01:21:20
https://github.com/golang/appengine/blob/master/me
| |
78 if i.Expiration() != 0 { | |
79 exp = time.Duration(now.Add(i.Expiration()).UnixNano()) | |
80 } | |
81 newItem := mcItem{ | |
82 key: i.Key(), | |
83 flags: i.Flags(), | |
84 expiration: exp, | |
85 value: i.Value(), | |
86 CasID: m.casID, | |
87 } | |
88 return newItem.duplicate(true) | |
dnj
2015/08/03 22:37:25
This seems like a weird way of doing this. Allocat
iannucci
2015/08/04 01:21:20
Done
| |
89 } | |
90 | |
91 func (m *memcacheData) setItemLocked(now time.Time, i mc.Item) { | |
92 if cur, ok := m.items[i.Key()]; ok { | |
93 m.stats.Items-- | |
94 m.stats.Bytes -= uint64(len(cur.value)) | |
95 } | |
96 m.stats.Items++ | |
97 m.stats.Bytes += uint64(len(i.Value())) | |
98 m.items[i.Key()] = m.mkItemLocked(now, i) | |
dnj
2015/08/03 22:37:25
Let "m.items" be allowed to be nil and test/init h
iannucci
2015/08/04 01:21:20
Why?
| |
99 } | |
100 | |
101 func (m *memcacheData) delItemLocked(k string) { | |
102 if itm, ok := m.items[k]; ok { | |
103 m.stats.Items-- | |
104 m.stats.Bytes -= uint64(len(itm.value)) | |
105 delete(m.items, k) | |
106 } | |
107 } | |
108 | |
109 func (m *memcacheData) reset() { | |
110 m.stats = mc.Statistics{} | |
111 m.items = map[string]*mcItem{} | |
dnj
2015/08/03 22:37:25
Should just "nil" it here.
iannucci
2015/08/04 01:21:20
Why?
dnj (Google)
2015/08/04 03:48:03
No reason to keep allocated empty maps around.
iannucci
2015/08/04 03:58:33
yeah, but if it doesn't have data in it, then you'
dnj
2015/08/04 18:01:16
Makes sense :)
| |
112 } | |
113 | |
114 func (m *memcacheData) hasLocked(now time.Time, key string) bool { | |
dnj
2015/08/03 22:37:25
I read this as "has locked?". Maybe rename, "hasIt
iannucci
2015/08/04 01:21:20
Done.
| |
115 ret, ok := m.items[key] | |
116 if ok && ret.Expiration() != 0 && ret.Expiration() < time.Duration(now.U nixNano()) { | |
dnj
2015/08/03 22:37:24
Yeah ret.Expiration() really should just be a time
iannucci
2015/08/04 01:21:20
I could do that in another CL. It'll cause more in
dnj (Google)
2015/08/04 03:48:03
Storing nanosecond since epoch in a time.Duration
iannucci
2015/08/04 03:58:33
Yes, that's fine, but the sdk memcache API uses ti
dnj
2015/08/04 18:01:16
Acknowledged.
| |
117 m.delItemLocked(key) | |
118 return false | |
119 } | |
120 return ok | |
121 } | |
122 | |
123 func (m *memcacheData) retrieveLocked(now time.Time, key string) (*mcItem, error ) { | |
124 if !m.hasLocked(now, key) { | |
125 m.stats.Misses++ | |
126 return nil, mc.ErrCacheMiss | |
127 } | |
128 | |
129 ret := m.items[key] | |
130 m.stats.Hits++ | |
131 m.stats.ByteHits += uint64(len(ret.value)) | |
132 return ret, nil | |
69 } | 133 } |
70 | 134 |
71 // memcacheImpl binds the current connection's memcache data to an | 135 // memcacheImpl binds the current connection's memcache data to an |
72 // implementation of {gae.Memcache, gae.Testable}. | 136 // implementation of {gae.Memcache, gae.Testable}. |
73 type memcacheImpl struct { | 137 type memcacheImpl struct { |
74 mc.Interface | |
75 | |
76 data *memcacheData | 138 data *memcacheData |
77 ctx context.Context | 139 ctx context.Context |
78 } | 140 } |
79 | 141 |
80 var _ mc.Interface = (*memcacheImpl)(nil) | 142 var _ mc.RawInterface = (*memcacheImpl)(nil) |
81 | 143 |
82 // useMC adds a gae.Memcache implementation to context, accessible | 144 // useMC adds a gae.Memcache implementation to context, accessible |
83 // by gae.GetMC(c) | 145 // by gae.GetMC(c) |
84 func useMC(c context.Context) context.Context { | 146 func useMC(c context.Context) context.Context { |
85 lck := sync.Mutex{} | 147 lck := sync.Mutex{} |
86 mcdMap := map[string]*memcacheData{} | 148 mcdMap := map[string]*memcacheData{} |
87 | 149 |
88 » return mc.SetFactory(c, func(ic context.Context) mc.Interface { | 150 » return mc.SetRawFactory(c, func(ic context.Context) mc.RawInterface { |
89 lck.Lock() | 151 lck.Lock() |
90 defer lck.Unlock() | 152 defer lck.Unlock() |
91 | 153 |
92 ns := curGID(ic).namespace | 154 ns := curGID(ic).namespace |
93 mcd, ok := mcdMap[ns] | 155 mcd, ok := mcdMap[ns] |
94 if !ok { | 156 if !ok { |
95 mcd = &memcacheData{items: map[string]*mcItem{}} | 157 mcd = &memcacheData{items: map[string]*mcItem{}} |
96 mcdMap[ns] = mcd | 158 mcdMap[ns] = mcd |
97 } | 159 } |
98 | 160 |
99 return &memcacheImpl{ | 161 return &memcacheImpl{ |
100 dummy.Memcache(), | |
101 mcd, | 162 mcd, |
102 ic, | 163 ic, |
103 } | 164 } |
104 }) | 165 }) |
105 } | 166 } |
106 | 167 |
107 func (m *memcacheImpl) mkItemLocked(i mc.Item) (ret *mcItem) { | |
108 m.data.casID++ | |
109 | |
110 var exp time.Duration | |
111 if i.Expiration() != 0 { | |
112 exp = time.Duration(clock.Now(m.ctx).Add(i.Expiration()).UnixNan o()) | |
113 } | |
114 newItem := mcItem{ | |
115 key: i.Key(), | |
116 flags: i.Flags(), | |
117 expiration: exp, | |
118 value: i.Value(), | |
119 CasID: m.data.casID, | |
120 } | |
121 return newItem.duplicate() | |
122 } | |
123 | |
124 func (m *memcacheImpl) NewItem(key string) mc.Item { | 168 func (m *memcacheImpl) NewItem(key string) mc.Item { |
125 return &mcItem{key: key} | 169 return &mcItem{key: key} |
126 } | 170 } |
127 | 171 |
128 // Add implements context.MCSingleReadWriter.Add. | 172 func (m *memcacheImpl) AddMulti(items []mc.Item, cb mc.RawCB) error { |
129 func (m *memcacheImpl) Add(i mc.Item) error { | |
130 m.data.lock.Lock() | 173 m.data.lock.Lock() |
131 defer m.data.lock.Unlock() | 174 defer m.data.lock.Unlock() |
132 | 175 |
133 » if _, ok := m.retrieveLocked(i.Key()); !ok { | 176 » for _, itm := range items { |
134 » » m.data.items[i.Key()] = m.mkItemLocked(i) | 177 » » now := clock.Now(m.ctx) |
dnj
2015/08/03 22:37:25
Do this once outside of the lock.
iannucci
2015/08/04 01:21:20
Done.
| |
135 » » return nil | 178 » » if !m.data.hasLocked(now, itm.Key()) { |
136 » } | 179 » » » m.data.setItemLocked(now, itm) |
137 » return mc.ErrNotStored | 180 » » » cb(nil) |
138 } | 181 » » } else { |
139 | 182 » » » cb(mc.ErrNotStored) |
140 // CompareAndSwap implements context.MCSingleReadWriter.CompareAndSwap. | |
141 func (m *memcacheImpl) CompareAndSwap(item mc.Item) error { | |
142 » m.data.lock.Lock() | |
143 » defer m.data.lock.Unlock() | |
144 | |
145 » if cur, ok := m.retrieveLocked(item.Key()); ok { | |
146 » » casid := uint64(0) | |
147 » » if mi, ok := item.(*mcItem); ok && mi != nil { | |
148 » » » casid = mi.CasID | |
149 } | 183 } |
150 | |
151 if cur.CasID == casid { | |
152 m.data.items[item.Key()] = m.mkItemLocked(item) | |
153 } else { | |
154 return mc.ErrCASConflict | |
155 } | |
156 } else { | |
157 return mc.ErrNotStored | |
158 } | 184 } |
159 return nil | 185 return nil |
160 } | 186 } |
161 | 187 |
162 // Set implements context.MCSingleReadWriter.Set. | 188 func (m *memcacheImpl) CompareAndSwapMulti(items []mc.Item, cb mc.RawCB) error { |
163 func (m *memcacheImpl) Set(i mc.Item) error { | |
164 m.data.lock.Lock() | 189 m.data.lock.Lock() |
165 defer m.data.lock.Unlock() | 190 defer m.data.lock.Unlock() |
166 » m.data.items[i.Key()] = m.mkItemLocked(i) | 191 |
192 » for _, itm := range items { | |
193 » » now := clock.Now(m.ctx) | |
dnj
2015/08/03 22:37:25
Do this once outside of the lock.
iannucci
2015/08/04 01:21:20
Done.
| |
194 » » err := error(nil) | |
195 » » cur := (*mcItem)(nil) | |
196 » » if cur, err = m.data.retrieveLocked(now, itm.Key()); err == nil { | |
197 » » » casid := uint64(0) | |
198 » » » if mi, ok := itm.(*mcItem); ok && mi != nil { | |
199 » » » » casid = mi.CasID | |
200 » » » } | |
201 | |
202 » » » if cur.CasID == casid { | |
203 » » » » m.data.setItemLocked(now, itm) | |
204 » » » } else { | |
205 » » » » err = mc.ErrCASConflict | |
206 » » » } | |
207 » » } else { | |
208 » » » err = mc.ErrNotStored | |
209 » » } | |
210 » » cb(err) | |
211 » } | |
212 | |
167 return nil | 213 return nil |
168 } | 214 } |
169 | 215 |
170 // Get implements context.MCSingleReadWriter.Get. | 216 func (m *memcacheImpl) SetMulti(items []mc.Item, cb mc.RawCB) error { |
171 func (m *memcacheImpl) Get(key string) (itm mc.Item, err error) { | |
172 m.data.lock.Lock() | 217 m.data.lock.Lock() |
173 defer m.data.lock.Unlock() | 218 defer m.data.lock.Unlock() |
174 » if val, ok := m.retrieveLocked(key); ok { | 219 » for _, itm := range items { |
175 » » itm = val.duplicate().SetExpiration(0) | 220 » » m.data.setItemLocked(clock.Now(m.ctx), itm) |
dnj
2015/08/03 22:37:25
Get "now" once, outside of the lock/loop.
iannucci
2015/08/04 01:21:20
Done.
| |
176 » } else { | 221 » » cb(nil) |
dnj
2015/08/03 22:37:25
Consider setting the items within the lock, but do
iannucci
2015/08/04 01:21:20
Done.
| |
177 » » err = mc.ErrCacheMiss | |
178 } | 222 } |
179 » return | 223 » return nil |
180 } | 224 } |
181 | 225 |
182 // Delete implements context.MCSingleReadWriter.Delete. | 226 func (m *memcacheImpl) GetMulti(keys []string, cb mc.RawItemCB) error { |
183 func (m *memcacheImpl) Delete(key string) error { | 227 » m.data.lock.RLock() |
228 » defer m.data.lock.RUnlock() | |
229 | |
230 » for _, k := range keys { | |
231 » » itm := (mc.Item)(nil) | |
232 » » val, err := m.data.retrieveLocked(clock.Now(m.ctx), k) | |
dnj
2015/08/03 22:37:25
Get "now" once, outside of the lock/loop.
iannucci
2015/08/04 01:21:20
Done.
| |
233 » » if err == nil { | |
234 » » » itm = val.duplicate(true).SetExpiration(0) | |
235 » » } | |
236 » » cb(itm, err) | |
237 » } | |
238 | |
239 » return nil | |
240 } | |
241 | |
242 func (m *memcacheImpl) DeleteMulti(keys []string, cb mc.RawCB) error { | |
184 m.data.lock.Lock() | 243 m.data.lock.Lock() |
185 defer m.data.lock.Unlock() | 244 defer m.data.lock.Unlock() |
186 | 245 |
187 » if _, ok := m.retrieveLocked(key); ok { | 246 » for _, k := range keys { |
188 » » delete(m.data.items, key) | 247 » » _, err := m.data.retrieveLocked(clock.Now(m.ctx), k) |
189 » » return nil | 248 » » if err == nil { |
249 » » » m.data.delItemLocked(k) | |
250 » » } | |
251 » » cb(err) | |
190 } | 252 } |
191 » return mc.ErrCacheMiss | 253 |
254 » return nil | |
192 } | 255 } |
193 | 256 |
194 func (m *memcacheImpl) retrieveLocked(key string) (*mcItem, bool) { | 257 func (m *memcacheImpl) Flush() error { |
195 » ret, ok := m.data.items[key] | 258 » m.data.lock.Lock() |
196 » if ok && ret.Expiration() != 0 && ret.Expiration() < time.Duration(clock .Now(m.ctx).UnixNano()) { | 259 » defer m.data.lock.Unlock() |
197 » » ret = nil | 260 |
198 » » ok = false | 261 » m.data.reset() |
199 » » delete(m.data.items, key) | 262 » return nil |
263 } | |
264 | |
265 func (m *memcacheImpl) Increment(key string, delta int64, initialValue *uint64) (uint64, error) { | |
266 » m.data.lock.RLock() | |
267 » defer m.data.lock.RUnlock() | |
268 | |
269 » now := clock.Now(m.ctx) | |
dnj
2015/08/03 22:37:25
Do this outside of the lock.
iannucci
2015/08/04 01:21:20
Done.
| |
270 | |
271 » cur := uint64(0) | |
272 » if initialValue == nil { | |
273 » » curItm, err := m.data.retrieveLocked(now, key) | |
274 » » if err != nil { | |
275 » » » return 0, err | |
276 » » } | |
277 » » if len(curItm.value) != 8 { | |
278 » » » return 0, errors.New("memcache Increment: got invalid cu rrent value") | |
279 » » } | |
280 » » cur = binary.LittleEndian.Uint64(curItm.value) | |
281 » } else { | |
282 » » cur = *initialValue | |
200 } | 283 } |
201 » return ret, ok | 284 » if delta < 0 { |
285 » » if uint64(-delta) > cur { | |
286 » » » cur = 0 | |
287 » » } else { | |
288 » » » cur -= uint64(-delta) | |
289 » » } | |
290 » } else { | |
291 » » cur += uint64(delta) | |
292 » } | |
293 | |
294 » newval := make([]byte, 8) | |
295 » binary.LittleEndian.PutUint64(newval, cur) | |
296 » m.data.setItemLocked(now, m.NewItem(key).SetValue(newval)) | |
dnj
2015/08/03 22:37:25
This isn't safe to do with just an RLock.
iannucci
2015/08/04 01:21:20
oops, typo
| |
297 | |
298 » return cur, nil | |
202 } | 299 } |
300 | |
301 func (m *memcacheImpl) Stats() (*mc.Statistics, error) { | |
302 m.data.lock.RLock() | |
303 defer m.data.lock.RUnlock() | |
304 | |
305 ret := m.data.stats | |
306 return &ret, nil | |
307 } | |
OLD | NEW |