| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package memory | 5 package memory |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "encoding/binary" |
| 8 "sync" | 9 "sync" |
| 9 "time" | 10 "time" |
| 10 | 11 |
| 11 "golang.org/x/net/context" | 12 "golang.org/x/net/context" |
| 12 | 13 |
| 13 "github.com/luci/gae/impl/dummy" | |
| 14 mc "github.com/luci/gae/service/memcache" | 14 mc "github.com/luci/gae/service/memcache" |
| 15 "github.com/luci/luci-go/common/clock" | 15 "github.com/luci/luci-go/common/clock" |
| 16 "github.com/luci/luci-go/common/errors" |
| 16 ) | 17 ) |
| 17 | 18 |
| 18 type mcItem struct { | 19 type mcItem struct { |
| 19 key string | 20 key string |
| 20 value []byte | 21 value []byte |
| 21 object interface{} | |
| 22 flags uint32 | 22 flags uint32 |
| 23 expiration time.Duration | 23 expiration time.Duration |
| 24 | 24 |
| 25 CasID uint64 | 25 CasID uint64 |
| 26 } | 26 } |
| 27 | 27 |
| 28 var _ mc.Item = (*mcItem)(nil) | 28 var _ mc.Item = (*mcItem)(nil) |
| 29 | 29 |
| 30 func (m *mcItem) Key() string { return m.key } | 30 func (m *mcItem) Key() string { return m.key } |
| 31 func (m *mcItem) Value() []byte { return m.value } | 31 func (m *mcItem) Value() []byte { return m.value } |
| 32 func (m *mcItem) Object() interface{} { return m.object } | |
| 33 func (m *mcItem) Flags() uint32 { return m.flags } | 32 func (m *mcItem) Flags() uint32 { return m.flags } |
| 34 func (m *mcItem) Expiration() time.Duration { return m.expiration } | 33 func (m *mcItem) Expiration() time.Duration { return m.expiration } |
| 35 | 34 |
| 36 func (m *mcItem) SetKey(key string) mc.Item { | 35 func (m *mcItem) SetKey(key string) mc.Item { |
| 37 m.key = key | 36 m.key = key |
| 38 return m | 37 return m |
| 39 } | 38 } |
| 40 func (m *mcItem) SetValue(val []byte) mc.Item { | 39 func (m *mcItem) SetValue(val []byte) mc.Item { |
| 41 m.value = val | 40 m.value = val |
| 42 return m | 41 return m |
| 43 } | 42 } |
| 44 func (m *mcItem) SetObject(obj interface{}) mc.Item { | |
| 45 m.object = obj | |
| 46 return m | |
| 47 } | |
| 48 func (m *mcItem) SetFlags(flg uint32) mc.Item { | 43 func (m *mcItem) SetFlags(flg uint32) mc.Item { |
| 49 m.flags = flg | 44 m.flags = flg |
| 50 return m | 45 return m |
| 51 } | 46 } |
| 52 func (m *mcItem) SetExpiration(exp time.Duration) mc.Item { | 47 func (m *mcItem) SetExpiration(exp time.Duration) mc.Item { |
| 53 m.expiration = exp | 48 m.expiration = exp |
| 54 return m | 49 return m |
| 55 } | 50 } |
| 56 | 51 |
| 57 func (m *mcItem) duplicate() *mcItem { | 52 func (m *mcItem) SetAll(other mc.Item) { |
| 53 » *m = *other.(*mcItem).duplicate(false) |
| 54 } |
| 55 |
| 56 func (m *mcItem) duplicate(deep bool) *mcItem { |
| 58 ret := mcItem{} | 57 ret := mcItem{} |
| 59 ret = *m | 58 ret = *m |
| 60 » ret.value = make([]byte, len(m.value)) | 59 » if deep { |
| 61 » copy(ret.value, m.value) | 60 » » ret.value = make([]byte, len(m.value)) |
| 61 » » copy(ret.value, m.value) |
| 62 » } |
| 62 return &ret | 63 return &ret |
| 63 } | 64 } |
| 64 | 65 |
| 65 type memcacheData struct { | 66 type memcacheData struct { |
| 66 » lock sync.Mutex | 67 » lock sync.RWMutex |
| 67 items map[string]*mcItem | 68 items map[string]*mcItem |
| 68 casID uint64 | 69 casID uint64 |
| 70 |
| 71 stats mc.Statistics |
| 72 } |
| 73 |
| 74 func (m *memcacheData) mkItemLocked(now time.Time, i mc.Item) (ret *mcItem) { |
| 75 m.casID++ |
| 76 |
| 77 var exp time.Duration |
| 78 if i.Expiration() != 0 { |
| 79 exp = time.Duration(now.Add(i.Expiration()).UnixNano()) |
| 80 } |
| 81 value := make([]byte, len(i.Value())) |
| 82 copy(value, i.Value()) |
| 83 return &mcItem{ |
| 84 key: i.Key(), |
| 85 flags: i.Flags(), |
| 86 expiration: exp, |
| 87 value: value, |
| 88 CasID: m.casID, |
| 89 } |
| 90 } |
| 91 |
| 92 func (m *memcacheData) setItemLocked(now time.Time, i mc.Item) { |
| 93 if cur, ok := m.items[i.Key()]; ok { |
| 94 m.stats.Items-- |
| 95 m.stats.Bytes -= uint64(len(cur.value)) |
| 96 } |
| 97 m.stats.Items++ |
| 98 m.stats.Bytes += uint64(len(i.Value())) |
| 99 m.items[i.Key()] = m.mkItemLocked(now, i) |
| 100 } |
| 101 |
| 102 func (m *memcacheData) delItemLocked(k string) { |
| 103 if itm, ok := m.items[k]; ok { |
| 104 m.stats.Items-- |
| 105 m.stats.Bytes -= uint64(len(itm.value)) |
| 106 delete(m.items, k) |
| 107 } |
| 108 } |
| 109 |
| 110 func (m *memcacheData) reset() { |
| 111 m.stats = mc.Statistics{} |
| 112 m.items = map[string]*mcItem{} |
| 113 } |
| 114 |
| 115 func (m *memcacheData) hasItemLocked(now time.Time, key string) bool { |
| 116 ret, ok := m.items[key] |
| 117 if ok && ret.Expiration() != 0 && ret.Expiration() < time.Duration(now.U
nixNano()) { |
| 118 m.delItemLocked(key) |
| 119 return false |
| 120 } |
| 121 return ok |
| 122 } |
| 123 |
| 124 func (m *memcacheData) retrieveLocked(now time.Time, key string) (*mcItem, error
) { |
| 125 if !m.hasItemLocked(now, key) { |
| 126 m.stats.Misses++ |
| 127 return nil, mc.ErrCacheMiss |
| 128 } |
| 129 |
| 130 ret := m.items[key] |
| 131 m.stats.Hits++ |
| 132 m.stats.ByteHits += uint64(len(ret.value)) |
| 133 return ret, nil |
| 69 } | 134 } |
| 70 | 135 |
| 71 // memcacheImpl binds the current connection's memcache data to an | 136 // memcacheImpl binds the current connection's memcache data to an |
| 72 // implementation of {gae.Memcache, gae.Testable}. | 137 // implementation of {gae.Memcache, gae.Testable}. |
| 73 type memcacheImpl struct { | 138 type memcacheImpl struct { |
| 74 mc.Interface | |
| 75 | |
| 76 data *memcacheData | 139 data *memcacheData |
| 77 ctx context.Context | 140 ctx context.Context |
| 78 } | 141 } |
| 79 | 142 |
| 80 var _ mc.Interface = (*memcacheImpl)(nil) | 143 var _ mc.RawInterface = (*memcacheImpl)(nil) |
| 81 | 144 |
| 82 // useMC adds a gae.Memcache implementation to context, accessible | 145 // useMC adds a gae.Memcache implementation to context, accessible |
| 83 // by gae.GetMC(c) | 146 // by gae.GetMC(c) |
| 84 func useMC(c context.Context) context.Context { | 147 func useMC(c context.Context) context.Context { |
| 85 lck := sync.Mutex{} | 148 lck := sync.Mutex{} |
| 86 mcdMap := map[string]*memcacheData{} | 149 mcdMap := map[string]*memcacheData{} |
| 87 | 150 |
| 88 » return mc.SetFactory(c, func(ic context.Context) mc.Interface { | 151 » return mc.SetRawFactory(c, func(ic context.Context) mc.RawInterface { |
| 89 lck.Lock() | 152 lck.Lock() |
| 90 defer lck.Unlock() | 153 defer lck.Unlock() |
| 91 | 154 |
| 92 ns := curGID(ic).namespace | 155 ns := curGID(ic).namespace |
| 93 mcd, ok := mcdMap[ns] | 156 mcd, ok := mcdMap[ns] |
| 94 if !ok { | 157 if !ok { |
| 95 mcd = &memcacheData{items: map[string]*mcItem{}} | 158 mcd = &memcacheData{items: map[string]*mcItem{}} |
| 96 mcdMap[ns] = mcd | 159 mcdMap[ns] = mcd |
| 97 } | 160 } |
| 98 | 161 |
| 99 return &memcacheImpl{ | 162 return &memcacheImpl{ |
| 100 dummy.Memcache(), | |
| 101 mcd, | 163 mcd, |
| 102 ic, | 164 ic, |
| 103 } | 165 } |
| 104 }) | 166 }) |
| 105 } | 167 } |
| 106 | 168 |
| 107 func (m *memcacheImpl) mkItemLocked(i mc.Item) (ret *mcItem) { | |
| 108 m.data.casID++ | |
| 109 | |
| 110 var exp time.Duration | |
| 111 if i.Expiration() != 0 { | |
| 112 exp = time.Duration(clock.Now(m.ctx).Add(i.Expiration()).UnixNan
o()) | |
| 113 } | |
| 114 newItem := mcItem{ | |
| 115 key: i.Key(), | |
| 116 flags: i.Flags(), | |
| 117 expiration: exp, | |
| 118 value: i.Value(), | |
| 119 CasID: m.data.casID, | |
| 120 } | |
| 121 return newItem.duplicate() | |
| 122 } | |
| 123 | |
| 124 func (m *memcacheImpl) NewItem(key string) mc.Item { | 169 func (m *memcacheImpl) NewItem(key string) mc.Item { |
| 125 return &mcItem{key: key} | 170 return &mcItem{key: key} |
| 126 } | 171 } |
| 127 | 172 |
| 128 // Add implements context.MCSingleReadWriter.Add. | 173 func doCBs(items []mc.Item, cb mc.RawCB, inner func(mc.Item) error) { |
| 129 func (m *memcacheImpl) Add(i mc.Item) error { | 174 » // This weird construction is so that we: |
| 175 » // - don't take the lock for the entire multi operation, since it coul
d imply |
| 176 » // false atomicity. |
| 177 » // - don't allow cb to block the actual batch operation, since that wo
uld |
| 178 » // allow binding in ways that aren't possible under the real |
| 179 » // implementation (like a recursive deadlock) |
| 180 » errs := make([]error, len(items)) |
| 181 » for i, itm := range items { |
| 182 » » errs[i] = inner(itm) |
| 183 » } |
| 184 » for _, e := range errs { |
| 185 » » cb(e) |
| 186 » } |
| 187 } |
| 188 |
| 189 func (m *memcacheImpl) AddMulti(items []mc.Item, cb mc.RawCB) error { |
| 190 » now := clock.Now(m.ctx) |
| 191 » doCBs(items, cb, func(itm mc.Item) error { |
| 192 » » m.data.lock.Lock() |
| 193 » » defer m.data.lock.Unlock() |
| 194 » » if !m.data.hasItemLocked(now, itm.Key()) { |
| 195 » » » m.data.setItemLocked(now, itm) |
| 196 » » » return nil |
| 197 » » } else { |
| 198 » » » return (mc.ErrNotStored) |
| 199 » » } |
| 200 » }) |
| 201 » return nil |
| 202 } |
| 203 |
| 204 func (m *memcacheImpl) CompareAndSwapMulti(items []mc.Item, cb mc.RawCB) error { |
| 205 » now := clock.Now(m.ctx) |
| 206 » doCBs(items, cb, func(itm mc.Item) error { |
| 207 » » m.data.lock.Lock() |
| 208 » » defer m.data.lock.Unlock() |
| 209 |
| 210 » » if cur, err := m.data.retrieveLocked(now, itm.Key()); err == nil
{ |
| 211 » » » casid := uint64(0) |
| 212 » » » if mi, ok := itm.(*mcItem); ok && mi != nil { |
| 213 » » » » casid = mi.CasID |
| 214 » » » } |
| 215 |
| 216 » » » if cur.CasID == casid { |
| 217 » » » » m.data.setItemLocked(now, itm) |
| 218 » » » } else { |
| 219 » » » » return mc.ErrCASConflict |
| 220 » » » } |
| 221 » » » return nil |
| 222 » » } |
| 223 » » return mc.ErrNotStored |
| 224 » }) |
| 225 » return nil |
| 226 } |
| 227 |
| 228 func (m *memcacheImpl) SetMulti(items []mc.Item, cb mc.RawCB) error { |
| 229 » now := clock.Now(m.ctx) |
| 230 » doCBs(items, cb, func(itm mc.Item) error { |
| 231 » » m.data.lock.Lock() |
| 232 » » defer m.data.lock.Unlock() |
| 233 » » m.data.setItemLocked(now, itm) |
| 234 » » return nil |
| 235 » }) |
| 236 » return nil |
| 237 } |
| 238 |
| 239 func (m *memcacheImpl) GetMulti(keys []string, cb mc.RawItemCB) error { |
| 240 » now := clock.Now(m.ctx) |
| 241 |
| 242 » itms := make([]mc.Item, len(keys)) |
| 243 » errs := make([]error, len(keys)) |
| 244 |
| 245 » for i, k := range keys { |
| 246 » » itms[i], errs[i] = func() (mc.Item, error) { |
| 247 » » » m.data.lock.RLock() |
| 248 » » » defer m.data.lock.RUnlock() |
| 249 » » » val, err := m.data.retrieveLocked(now, k) |
| 250 » » » if err != nil { |
| 251 » » » » return nil, err |
| 252 » » » } |
| 253 » » » return val.duplicate(true).SetExpiration(0), nil |
| 254 » » }() |
| 255 » } |
| 256 |
| 257 » for i, itm := range itms { |
| 258 » » cb(itm, errs[i]) |
| 259 » } |
| 260 |
| 261 » return nil |
| 262 } |
| 263 |
| 264 func (m *memcacheImpl) DeleteMulti(keys []string, cb mc.RawCB) error { |
| 265 » now := clock.Now(m.ctx) |
| 266 |
| 267 » errs := make([]error, len(keys)) |
| 268 |
| 269 » for i, k := range keys { |
| 270 » » errs[i] = func() error { |
| 271 » » » m.data.lock.Lock() |
| 272 » » » defer m.data.lock.Unlock() |
| 273 » » » _, err := m.data.retrieveLocked(now, k) |
| 274 » » » if err != nil { |
| 275 » » » » return err |
| 276 » » » } |
| 277 » » » m.data.delItemLocked(k) |
| 278 » » » return nil |
| 279 » » }() |
| 280 » } |
| 281 |
| 282 » for _, e := range errs { |
| 283 » » cb(e) |
| 284 » } |
| 285 |
| 286 » return nil |
| 287 } |
| 288 |
| 289 func (m *memcacheImpl) Flush() error { |
| 130 m.data.lock.Lock() | 290 m.data.lock.Lock() |
| 131 defer m.data.lock.Unlock() | 291 defer m.data.lock.Unlock() |
| 132 | 292 |
| 133 » if _, ok := m.retrieveLocked(i.Key()); !ok { | 293 » m.data.reset() |
| 134 » » m.data.items[i.Key()] = m.mkItemLocked(i) | 294 » return nil |
| 135 » » return nil | |
| 136 » } | |
| 137 » return mc.ErrNotStored | |
| 138 } | 295 } |
| 139 | 296 |
| 140 // CompareAndSwap implements context.MCSingleReadWriter.CompareAndSwap. | 297 func (m *memcacheImpl) Increment(key string, delta int64, initialValue *uint64)
(uint64, error) { |
| 141 func (m *memcacheImpl) CompareAndSwap(item mc.Item) error { | 298 » now := clock.Now(m.ctx) |
| 299 |
| 142 m.data.lock.Lock() | 300 m.data.lock.Lock() |
| 143 defer m.data.lock.Unlock() | 301 defer m.data.lock.Unlock() |
| 144 | 302 |
| 145 » if cur, ok := m.retrieveLocked(item.Key()); ok { | 303 » cur := uint64(0) |
| 146 » » casid := uint64(0) | 304 » if initialValue == nil { |
| 147 » » if mi, ok := item.(*mcItem); ok && mi != nil { | 305 » » curItm, err := m.data.retrieveLocked(now, key) |
| 148 » » » casid = mi.CasID | 306 » » if err != nil { |
| 307 » » » return 0, err |
| 149 } | 308 } |
| 150 | 309 » » if len(curItm.value) != 8 { |
| 151 » » if cur.CasID == casid { | 310 » » » return 0, errors.New("memcache Increment: got invalid cu
rrent value") |
| 152 » » » m.data.items[item.Key()] = m.mkItemLocked(item) | 311 » » } |
| 312 » » cur = binary.LittleEndian.Uint64(curItm.value) |
| 313 » } else { |
| 314 » » cur = *initialValue |
| 315 » } |
| 316 » if delta < 0 { |
| 317 » » if uint64(-delta) > cur { |
| 318 » » » cur = 0 |
| 153 } else { | 319 } else { |
| 154 » » » return mc.ErrCASConflict | 320 » » » cur -= uint64(-delta) |
| 155 } | 321 } |
| 156 } else { | 322 } else { |
| 157 » » return mc.ErrNotStored | 323 » » cur += uint64(delta) |
| 158 } | 324 } |
| 159 » return nil | 325 |
| 326 » newval := make([]byte, 8) |
| 327 » binary.LittleEndian.PutUint64(newval, cur) |
| 328 » m.data.setItemLocked(now, m.NewItem(key).SetValue(newval)) |
| 329 |
| 330 » return cur, nil |
| 160 } | 331 } |
| 161 | 332 |
| 162 // Set implements context.MCSingleReadWriter.Set. | 333 func (m *memcacheImpl) Stats() (*mc.Statistics, error) { |
| 163 func (m *memcacheImpl) Set(i mc.Item) error { | 334 » m.data.lock.RLock() |
| 164 » m.data.lock.Lock() | 335 » defer m.data.lock.RUnlock() |
| 165 » defer m.data.lock.Unlock() | 336 |
| 166 » m.data.items[i.Key()] = m.mkItemLocked(i) | 337 » ret := m.data.stats |
| 167 » return nil | 338 » return &ret, nil |
| 168 } | 339 } |
| 169 | |
| 170 // Get implements context.MCSingleReadWriter.Get. | |
| 171 func (m *memcacheImpl) Get(key string) (itm mc.Item, err error) { | |
| 172 m.data.lock.Lock() | |
| 173 defer m.data.lock.Unlock() | |
| 174 if val, ok := m.retrieveLocked(key); ok { | |
| 175 itm = val.duplicate().SetExpiration(0) | |
| 176 } else { | |
| 177 err = mc.ErrCacheMiss | |
| 178 } | |
| 179 return | |
| 180 } | |
| 181 | |
| 182 // Delete implements context.MCSingleReadWriter.Delete. | |
| 183 func (m *memcacheImpl) Delete(key string) error { | |
| 184 m.data.lock.Lock() | |
| 185 defer m.data.lock.Unlock() | |
| 186 | |
| 187 if _, ok := m.retrieveLocked(key); ok { | |
| 188 delete(m.data.items, key) | |
| 189 return nil | |
| 190 } | |
| 191 return mc.ErrCacheMiss | |
| 192 } | |
| 193 | |
| 194 func (m *memcacheImpl) retrieveLocked(key string) (*mcItem, bool) { | |
| 195 ret, ok := m.data.items[key] | |
| 196 if ok && ret.Expiration() != 0 && ret.Expiration() < time.Duration(clock
.Now(m.ctx).UnixNano()) { | |
| 197 ret = nil | |
| 198 ok = false | |
| 199 delete(m.data.items, key) | |
| 200 } | |
| 201 return ret, ok | |
| 202 } | |
| OLD | NEW |