OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package memory | 5 package memory |
6 | 6 |
7 import ( | 7 import ( |
| 8 "encoding/binary" |
8 "sync" | 9 "sync" |
9 "time" | 10 "time" |
10 | 11 |
11 "golang.org/x/net/context" | 12 "golang.org/x/net/context" |
12 | 13 |
13 "github.com/luci/gae/impl/dummy" | |
14 mc "github.com/luci/gae/service/memcache" | 14 mc "github.com/luci/gae/service/memcache" |
15 "github.com/luci/luci-go/common/clock" | 15 "github.com/luci/luci-go/common/clock" |
| 16 "github.com/luci/luci-go/common/errors" |
16 ) | 17 ) |
17 | 18 |
18 type mcItem struct { | 19 type mcItem struct { |
19 key string | 20 key string |
20 value []byte | 21 value []byte |
21 object interface{} | |
22 flags uint32 | 22 flags uint32 |
23 expiration time.Duration | 23 expiration time.Duration |
24 | 24 |
25 CasID uint64 | 25 CasID uint64 |
26 } | 26 } |
27 | 27 |
28 var _ mc.Item = (*mcItem)(nil) | 28 var _ mc.Item = (*mcItem)(nil) |
29 | 29 |
30 func (m *mcItem) Key() string { return m.key } | 30 func (m *mcItem) Key() string { return m.key } |
31 func (m *mcItem) Value() []byte { return m.value } | 31 func (m *mcItem) Value() []byte { return m.value } |
32 func (m *mcItem) Object() interface{} { return m.object } | |
33 func (m *mcItem) Flags() uint32 { return m.flags } | 32 func (m *mcItem) Flags() uint32 { return m.flags } |
34 func (m *mcItem) Expiration() time.Duration { return m.expiration } | 33 func (m *mcItem) Expiration() time.Duration { return m.expiration } |
35 | 34 |
36 func (m *mcItem) SetKey(key string) mc.Item { | 35 func (m *mcItem) SetKey(key string) mc.Item { |
37 m.key = key | 36 m.key = key |
38 return m | 37 return m |
39 } | 38 } |
40 func (m *mcItem) SetValue(val []byte) mc.Item { | 39 func (m *mcItem) SetValue(val []byte) mc.Item { |
41 m.value = val | 40 m.value = val |
42 return m | 41 return m |
43 } | 42 } |
44 func (m *mcItem) SetObject(obj interface{}) mc.Item { | |
45 m.object = obj | |
46 return m | |
47 } | |
48 func (m *mcItem) SetFlags(flg uint32) mc.Item { | 43 func (m *mcItem) SetFlags(flg uint32) mc.Item { |
49 m.flags = flg | 44 m.flags = flg |
50 return m | 45 return m |
51 } | 46 } |
52 func (m *mcItem) SetExpiration(exp time.Duration) mc.Item { | 47 func (m *mcItem) SetExpiration(exp time.Duration) mc.Item { |
53 m.expiration = exp | 48 m.expiration = exp |
54 return m | 49 return m |
55 } | 50 } |
56 | 51 |
57 func (m *mcItem) duplicate() *mcItem { | 52 func (m *mcItem) SetAll(other mc.Item) { |
| 53 » *m = *other.(*mcItem).duplicate(false) |
| 54 } |
| 55 |
| 56 func (m *mcItem) duplicate(deep bool) *mcItem { |
58 ret := mcItem{} | 57 ret := mcItem{} |
59 ret = *m | 58 ret = *m |
60 » ret.value = make([]byte, len(m.value)) | 59 » if deep { |
61 » copy(ret.value, m.value) | 60 » » ret.value = make([]byte, len(m.value)) |
| 61 » » copy(ret.value, m.value) |
| 62 » } |
62 return &ret | 63 return &ret |
63 } | 64 } |
64 | 65 |
65 type memcacheData struct { | 66 type memcacheData struct { |
66 » lock sync.Mutex | 67 » lock sync.RWMutex |
67 items map[string]*mcItem | 68 items map[string]*mcItem |
68 casID uint64 | 69 casID uint64 |
| 70 |
| 71 stats mc.Statistics |
| 72 } |
| 73 |
| 74 func (m *memcacheData) mkItemLocked(now time.Time, i mc.Item) (ret *mcItem) { |
| 75 m.casID++ |
| 76 |
| 77 var exp time.Duration |
| 78 if i.Expiration() != 0 { |
| 79 exp = time.Duration(now.Add(i.Expiration()).UnixNano()) |
| 80 } |
| 81 value := make([]byte, len(i.Value())) |
| 82 copy(value, i.Value()) |
| 83 return &mcItem{ |
| 84 key: i.Key(), |
| 85 flags: i.Flags(), |
| 86 expiration: exp, |
| 87 value: value, |
| 88 CasID: m.casID, |
| 89 } |
| 90 } |
| 91 |
| 92 func (m *memcacheData) setItemLocked(now time.Time, i mc.Item) { |
| 93 if cur, ok := m.items[i.Key()]; ok { |
| 94 m.stats.Items-- |
| 95 m.stats.Bytes -= uint64(len(cur.value)) |
| 96 } |
| 97 m.stats.Items++ |
| 98 m.stats.Bytes += uint64(len(i.Value())) |
| 99 m.items[i.Key()] = m.mkItemLocked(now, i) |
| 100 } |
| 101 |
| 102 func (m *memcacheData) delItemLocked(k string) { |
| 103 if itm, ok := m.items[k]; ok { |
| 104 m.stats.Items-- |
| 105 m.stats.Bytes -= uint64(len(itm.value)) |
| 106 delete(m.items, k) |
| 107 } |
| 108 } |
| 109 |
| 110 func (m *memcacheData) reset() { |
| 111 m.stats = mc.Statistics{} |
| 112 m.items = map[string]*mcItem{} |
| 113 } |
| 114 |
| 115 func (m *memcacheData) hasItemLocked(now time.Time, key string) bool { |
| 116 ret, ok := m.items[key] |
| 117 if ok && ret.Expiration() != 0 && ret.Expiration() < time.Duration(now.U
nixNano()) { |
| 118 m.delItemLocked(key) |
| 119 return false |
| 120 } |
| 121 return ok |
| 122 } |
| 123 |
| 124 func (m *memcacheData) retrieveLocked(now time.Time, key string) (*mcItem, error
) { |
| 125 if !m.hasItemLocked(now, key) { |
| 126 m.stats.Misses++ |
| 127 return nil, mc.ErrCacheMiss |
| 128 } |
| 129 |
| 130 ret := m.items[key] |
| 131 m.stats.Hits++ |
| 132 m.stats.ByteHits += uint64(len(ret.value)) |
| 133 return ret, nil |
69 } | 134 } |
70 | 135 |
71 // memcacheImpl binds the current connection's memcache data to an | 136 // memcacheImpl binds the current connection's memcache data to an |
72 // implementation of {gae.Memcache, gae.Testable}. | 137 // implementation of {gae.Memcache, gae.Testable}. |
73 type memcacheImpl struct { | 138 type memcacheImpl struct { |
74 mc.Interface | |
75 | |
76 data *memcacheData | 139 data *memcacheData |
77 ctx context.Context | 140 ctx context.Context |
78 } | 141 } |
79 | 142 |
80 var _ mc.Interface = (*memcacheImpl)(nil) | 143 var _ mc.RawInterface = (*memcacheImpl)(nil) |
81 | 144 |
82 // useMC adds a gae.Memcache implementation to context, accessible | 145 // useMC adds a gae.Memcache implementation to context, accessible |
83 // by gae.GetMC(c) | 146 // by gae.GetMC(c) |
84 func useMC(c context.Context) context.Context { | 147 func useMC(c context.Context) context.Context { |
85 lck := sync.Mutex{} | 148 lck := sync.Mutex{} |
86 mcdMap := map[string]*memcacheData{} | 149 mcdMap := map[string]*memcacheData{} |
87 | 150 |
88 » return mc.SetFactory(c, func(ic context.Context) mc.Interface { | 151 » return mc.SetRawFactory(c, func(ic context.Context) mc.RawInterface { |
89 lck.Lock() | 152 lck.Lock() |
90 defer lck.Unlock() | 153 defer lck.Unlock() |
91 | 154 |
92 ns := curGID(ic).namespace | 155 ns := curGID(ic).namespace |
93 mcd, ok := mcdMap[ns] | 156 mcd, ok := mcdMap[ns] |
94 if !ok { | 157 if !ok { |
95 mcd = &memcacheData{items: map[string]*mcItem{}} | 158 mcd = &memcacheData{items: map[string]*mcItem{}} |
96 mcdMap[ns] = mcd | 159 mcdMap[ns] = mcd |
97 } | 160 } |
98 | 161 |
99 return &memcacheImpl{ | 162 return &memcacheImpl{ |
100 dummy.Memcache(), | |
101 mcd, | 163 mcd, |
102 ic, | 164 ic, |
103 } | 165 } |
104 }) | 166 }) |
105 } | 167 } |
106 | 168 |
107 func (m *memcacheImpl) mkItemLocked(i mc.Item) (ret *mcItem) { | |
108 m.data.casID++ | |
109 | |
110 var exp time.Duration | |
111 if i.Expiration() != 0 { | |
112 exp = time.Duration(clock.Now(m.ctx).Add(i.Expiration()).UnixNan
o()) | |
113 } | |
114 newItem := mcItem{ | |
115 key: i.Key(), | |
116 flags: i.Flags(), | |
117 expiration: exp, | |
118 value: i.Value(), | |
119 CasID: m.data.casID, | |
120 } | |
121 return newItem.duplicate() | |
122 } | |
123 | |
124 func (m *memcacheImpl) NewItem(key string) mc.Item { | 169 func (m *memcacheImpl) NewItem(key string) mc.Item { |
125 return &mcItem{key: key} | 170 return &mcItem{key: key} |
126 } | 171 } |
127 | 172 |
128 // Add implements context.MCSingleReadWriter.Add. | 173 func doCBs(items []mc.Item, cb mc.RawCB, inner func(mc.Item) error) { |
129 func (m *memcacheImpl) Add(i mc.Item) error { | 174 » // This weird construction is so that we: |
| 175 » // - don't take the lock for the entire multi operation, since it coul
d imply |
| 176 » // false atomicity. |
| 177 » // - don't allow cb to block the actual batch operation, since that wo
uld |
| 178 » // allow binding in ways that aren't possible under the real |
| 179 » // implementation (like a recursive deadlock) |
| 180 » errs := make([]error, len(items)) |
| 181 » for i, itm := range items { |
| 182 » » errs[i] = inner(itm) |
| 183 » } |
| 184 » for _, e := range errs { |
| 185 » » cb(e) |
| 186 » } |
| 187 } |
| 188 |
| 189 func (m *memcacheImpl) AddMulti(items []mc.Item, cb mc.RawCB) error { |
| 190 » now := clock.Now(m.ctx) |
| 191 » doCBs(items, cb, func(itm mc.Item) error { |
| 192 » » m.data.lock.Lock() |
| 193 » » defer m.data.lock.Unlock() |
| 194 » » if !m.data.hasItemLocked(now, itm.Key()) { |
| 195 » » » m.data.setItemLocked(now, itm) |
| 196 » » » return nil |
| 197 » » } else { |
| 198 » » » return (mc.ErrNotStored) |
| 199 » » } |
| 200 » }) |
| 201 » return nil |
| 202 } |
| 203 |
| 204 func (m *memcacheImpl) CompareAndSwapMulti(items []mc.Item, cb mc.RawCB) error { |
| 205 » now := clock.Now(m.ctx) |
| 206 » doCBs(items, cb, func(itm mc.Item) error { |
| 207 » » m.data.lock.Lock() |
| 208 » » defer m.data.lock.Unlock() |
| 209 |
| 210 » » if cur, err := m.data.retrieveLocked(now, itm.Key()); err == nil
{ |
| 211 » » » casid := uint64(0) |
| 212 » » » if mi, ok := itm.(*mcItem); ok && mi != nil { |
| 213 » » » » casid = mi.CasID |
| 214 » » » } |
| 215 |
| 216 » » » if cur.CasID == casid { |
| 217 » » » » m.data.setItemLocked(now, itm) |
| 218 » » » } else { |
| 219 » » » » return mc.ErrCASConflict |
| 220 » » » } |
| 221 » » » return nil |
| 222 » » } |
| 223 » » return mc.ErrNotStored |
| 224 » }) |
| 225 » return nil |
| 226 } |
| 227 |
| 228 func (m *memcacheImpl) SetMulti(items []mc.Item, cb mc.RawCB) error { |
| 229 » now := clock.Now(m.ctx) |
| 230 » doCBs(items, cb, func(itm mc.Item) error { |
| 231 » » m.data.lock.Lock() |
| 232 » » defer m.data.lock.Unlock() |
| 233 » » m.data.setItemLocked(now, itm) |
| 234 » » return nil |
| 235 » }) |
| 236 » return nil |
| 237 } |
| 238 |
| 239 func (m *memcacheImpl) GetMulti(keys []string, cb mc.RawItemCB) error { |
| 240 » now := clock.Now(m.ctx) |
| 241 |
| 242 » itms := make([]mc.Item, len(keys)) |
| 243 » errs := make([]error, len(keys)) |
| 244 |
| 245 » for i, k := range keys { |
| 246 » » itms[i], errs[i] = func() (mc.Item, error) { |
| 247 » » » m.data.lock.RLock() |
| 248 » » » defer m.data.lock.RUnlock() |
| 249 » » » val, err := m.data.retrieveLocked(now, k) |
| 250 » » » if err != nil { |
| 251 » » » » return nil, err |
| 252 » » » } |
| 253 » » » return val.duplicate(true).SetExpiration(0), nil |
| 254 » » }() |
| 255 » } |
| 256 |
| 257 » for i, itm := range itms { |
| 258 » » cb(itm, errs[i]) |
| 259 » } |
| 260 |
| 261 » return nil |
| 262 } |
| 263 |
| 264 func (m *memcacheImpl) DeleteMulti(keys []string, cb mc.RawCB) error { |
| 265 » now := clock.Now(m.ctx) |
| 266 |
| 267 » errs := make([]error, len(keys)) |
| 268 |
| 269 » for i, k := range keys { |
| 270 » » errs[i] = func() error { |
| 271 » » » m.data.lock.Lock() |
| 272 » » » defer m.data.lock.Unlock() |
| 273 » » » _, err := m.data.retrieveLocked(now, k) |
| 274 » » » if err != nil { |
| 275 » » » » return err |
| 276 » » » } |
| 277 » » » m.data.delItemLocked(k) |
| 278 » » » return nil |
| 279 » » }() |
| 280 » } |
| 281 |
| 282 » for _, e := range errs { |
| 283 » » cb(e) |
| 284 » } |
| 285 |
| 286 » return nil |
| 287 } |
| 288 |
| 289 func (m *memcacheImpl) Flush() error { |
130 m.data.lock.Lock() | 290 m.data.lock.Lock() |
131 defer m.data.lock.Unlock() | 291 defer m.data.lock.Unlock() |
132 | 292 |
133 » if _, ok := m.retrieveLocked(i.Key()); !ok { | 293 » m.data.reset() |
134 » » m.data.items[i.Key()] = m.mkItemLocked(i) | 294 » return nil |
135 » » return nil | |
136 » } | |
137 » return mc.ErrNotStored | |
138 } | 295 } |
139 | 296 |
140 // CompareAndSwap implements context.MCSingleReadWriter.CompareAndSwap. | 297 func (m *memcacheImpl) Increment(key string, delta int64, initialValue *uint64)
(uint64, error) { |
141 func (m *memcacheImpl) CompareAndSwap(item mc.Item) error { | 298 » now := clock.Now(m.ctx) |
| 299 |
142 m.data.lock.Lock() | 300 m.data.lock.Lock() |
143 defer m.data.lock.Unlock() | 301 defer m.data.lock.Unlock() |
144 | 302 |
145 » if cur, ok := m.retrieveLocked(item.Key()); ok { | 303 » cur := uint64(0) |
146 » » casid := uint64(0) | 304 » if initialValue == nil { |
147 » » if mi, ok := item.(*mcItem); ok && mi != nil { | 305 » » curItm, err := m.data.retrieveLocked(now, key) |
148 » » » casid = mi.CasID | 306 » » if err != nil { |
| 307 » » » return 0, err |
149 } | 308 } |
150 | 309 » » if len(curItm.value) != 8 { |
151 » » if cur.CasID == casid { | 310 » » » return 0, errors.New("memcache Increment: got invalid cu
rrent value") |
152 » » » m.data.items[item.Key()] = m.mkItemLocked(item) | 311 » » } |
| 312 » » cur = binary.LittleEndian.Uint64(curItm.value) |
| 313 » } else { |
| 314 » » cur = *initialValue |
| 315 » } |
| 316 » if delta < 0 { |
| 317 » » if uint64(-delta) > cur { |
| 318 » » » cur = 0 |
153 } else { | 319 } else { |
154 » » » return mc.ErrCASConflict | 320 » » » cur -= uint64(-delta) |
155 } | 321 } |
156 } else { | 322 } else { |
157 » » return mc.ErrNotStored | 323 » » cur += uint64(delta) |
158 } | 324 } |
159 » return nil | 325 |
| 326 » newval := make([]byte, 8) |
| 327 » binary.LittleEndian.PutUint64(newval, cur) |
| 328 » m.data.setItemLocked(now, m.NewItem(key).SetValue(newval)) |
| 329 |
| 330 » return cur, nil |
160 } | 331 } |
161 | 332 |
162 // Set implements context.MCSingleReadWriter.Set. | 333 func (m *memcacheImpl) Stats() (*mc.Statistics, error) { |
163 func (m *memcacheImpl) Set(i mc.Item) error { | 334 » m.data.lock.RLock() |
164 » m.data.lock.Lock() | 335 » defer m.data.lock.RUnlock() |
165 » defer m.data.lock.Unlock() | 336 |
166 » m.data.items[i.Key()] = m.mkItemLocked(i) | 337 » ret := m.data.stats |
167 » return nil | 338 » return &ret, nil |
168 } | 339 } |
169 | |
170 // Get implements context.MCSingleReadWriter.Get. | |
171 func (m *memcacheImpl) Get(key string) (itm mc.Item, err error) { | |
172 m.data.lock.Lock() | |
173 defer m.data.lock.Unlock() | |
174 if val, ok := m.retrieveLocked(key); ok { | |
175 itm = val.duplicate().SetExpiration(0) | |
176 } else { | |
177 err = mc.ErrCacheMiss | |
178 } | |
179 return | |
180 } | |
181 | |
182 // Delete implements context.MCSingleReadWriter.Delete. | |
183 func (m *memcacheImpl) Delete(key string) error { | |
184 m.data.lock.Lock() | |
185 defer m.data.lock.Unlock() | |
186 | |
187 if _, ok := m.retrieveLocked(key); ok { | |
188 delete(m.data.items, key) | |
189 return nil | |
190 } | |
191 return mc.ErrCacheMiss | |
192 } | |
193 | |
194 func (m *memcacheImpl) retrieveLocked(key string) (*mcItem, bool) { | |
195 ret, ok := m.data.items[key] | |
196 if ok && ret.Expiration() != 0 && ret.Expiration() < time.Duration(clock
.Now(m.ctx).UnixNano()) { | |
197 ret = nil | |
198 ok = false | |
199 delete(m.data.items, key) | |
200 } | |
201 return ret, ok | |
202 } | |
OLD | NEW |