OLD | NEW |
(Empty) | |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package dscache |
| 6 |
| 7 import ( |
| 8 "bytes" |
| 9 "fmt" |
| 10 "math/rand" |
| 11 "time" |
| 12 |
| 13 ds "github.com/luci/gae/service/datastore" |
| 14 "github.com/luci/gae/service/memcache" |
| 15 "github.com/luci/luci-go/common/errors" |
| 16 "github.com/luci/luci-go/common/logging" |
| 17 "golang.org/x/net/context" |
| 18 ) |
| 19 |
| 20 type dsCache struct { |
| 21 ds.RawInterface |
| 22 |
| 23 aid string |
| 24 ns string |
| 25 |
| 26 mc memcache.Interface |
| 27 log logging.Logger |
| 28 mr *rand.Rand |
| 29 } |
| 30 |
| 31 var _ ds.RawInterface = (*dsCache)(nil) |
| 32 |
| 33 func (d *dsCache) mutation(keys []ds.Key, f func() error) error { |
| 34 lockItems, lockKeys := mkAllLockItems(d.mc, keys) |
| 35 if lockItems == nil { |
| 36 return f() |
| 37 } |
| 38 if err := d.mc.SetMulti(lockItems); err != nil { |
| 39 d.log.Errorf("dscache: mc.SetMulti: %s", err) |
| 40 } |
| 41 err := f() |
| 42 if err == nil { |
| 43 if err := d.mc.DeleteMulti(lockKeys); err != nil { |
| 44 d.log.Errorf("dscache: mc.DeleteMulti: %s", err) |
| 45 } |
| 46 } |
| 47 return err |
| 48 } |
| 49 |
| 50 func (d *dsCache) DeleteMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { |
| 51 return d.mutation(keys, func() error { |
| 52 return d.RawInterface.DeleteMulti(keys, cb) |
| 53 }) |
| 54 } |
| 55 |
| 56 func (d *dsCache) PutMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.PutMultiC
B) error { |
| 57 return d.mutation(keys, func() error { |
| 58 return d.RawInterface.PutMulti(keys, vals, cb) |
| 59 }) |
| 60 } |
| 61 |
| 62 type facts struct { |
| 63 getKeys []ds.Key |
| 64 getMeta []ds.PropertyMap |
| 65 lockItems []memcache.Item |
| 66 } |
| 67 |
| 68 type plan struct { |
| 69 keepMeta bool |
| 70 |
| 71 idxMap []int |
| 72 toGet []ds.Key |
| 73 toGetMeta []ds.PropertyMap |
| 74 toSave []memcache.Item |
| 75 |
| 76 decoded []ds.PropertyMap |
| 77 lme errors.LazyMultiError |
| 78 } |
| 79 |
| 80 func (p *plan) add(idx int, get ds.Key, m ds.PropertyMap, save memcache.Item) { |
| 81 p.idxMap = append(p.idxMap, idx) |
| 82 p.toGet = append(p.toGet, get) |
| 83 |
| 84 if save != nil { |
| 85 save.SetFlags(uint32(ItemHasData)) |
| 86 } |
| 87 p.toSave = append(p.toSave, save) |
| 88 |
| 89 if p.keepMeta { |
| 90 p.toGetMeta = append(p.toGetMeta, m) |
| 91 } |
| 92 } |
| 93 |
| 94 func (p *plan) empty() bool { |
| 95 return len(p.idxMap) == 0 |
| 96 } |
| 97 |
| 98 func (d *dsCache) makePlan(f *facts) *plan { |
| 99 // get index -> items index |
| 100 p := plan{ |
| 101 keepMeta: f.getMeta != nil, |
| 102 decoded: make([]ds.PropertyMap, len(f.lockItems)), |
| 103 lme: errors.LazyMultiError{Size: len(f.lockItems)}, |
| 104 } |
| 105 for i, lockItm := range f.lockItems { |
| 106 curItm := f.lockItems[i] |
| 107 m := ds.PropertyMap(nil) |
| 108 if f.getMeta != nil { |
| 109 m = f.getMeta[i] |
| 110 } |
| 111 |
| 112 getKey := f.getKeys[i] |
| 113 if !meta(m, CacheEnableMeta, true).(bool) { |
| 114 fmt.Println("cache is disabled!") |
| 115 p.add(i, getKey, m, nil) |
| 116 continue |
| 117 } |
| 118 |
| 119 if curItm == nil { |
| 120 fmt.Println("missing item in memcache!") |
| 121 p.add(i, getKey, m, nil) |
| 122 continue |
| 123 } |
| 124 |
| 125 flg := FlagValue(curItm.Flags()) |
| 126 if flg == ItemHasLock { |
| 127 if !bytes.Equal(curItm.Value(), lockItm.Value()) { |
| 128 curItm = nil // someone else has the lock, don't
save |
| 129 } |
| 130 fmt.Println("found lock item in memcache!") |
| 131 p.add(i, getKey, m, curItm) |
| 132 continue |
| 133 } |
| 134 |
| 135 pmap, err := decodeValue(curItm.Value(), d.aid, d.ns) |
| 136 switch err { |
| 137 case nil: |
| 138 p.decoded[i] = pmap |
| 139 case ds.ErrNoSuchEntity: |
| 140 p.lme.Assign(i, ds.ErrNoSuchEntity) |
| 141 default: |
| 142 d.log.Errorf("dscache: error decoding %s, %s: %s", curIt
m.Key(), getKey, err) |
| 143 fmt.Println("failed to decode!") |
| 144 p.add(i, getKey, m, curItm) |
| 145 } |
| 146 } |
| 147 return &p |
| 148 } |
| 149 |
| 150 func (d *dsCache) GetMulti(keys []ds.Key, metas []ds.PropertyMap, cb ds.GetMulti
CB) error { |
| 151 lockItems := mkRandLockItems(d.mc, d.mr, keys) |
| 152 if lockItems == nil { |
| 153 return d.RawInterface.GetMulti(keys, metas, cb) |
| 154 } |
| 155 |
| 156 if err := d.mc.AddMulti(lockItems); err != nil { |
| 157 d.log.Errorf("dscache: mc.AddMulti: %s", err) |
| 158 } |
| 159 if err := d.mc.GetMulti(lockItems); err != nil { |
| 160 d.log.Errorf("dscache: mc.GetMulti: %s", err) |
| 161 } |
| 162 |
| 163 p := d.makePlan(&facts{keys, metas, lockItems}) |
| 164 |
| 165 if !p.empty() { |
| 166 fmt.Println("have to touch DS :(", p) |
| 167 toCas := []memcache.Item{} |
| 168 j := 0 |
| 169 err := d.RawInterface.GetMulti(p.toGet, p.toGetMeta, func(pm ds.
PropertyMap, err error) { |
| 170 i := p.idxMap[j] |
| 171 toSave := p.toSave[j] |
| 172 |
| 173 data := []byte(nil) |
| 174 |
| 175 if err == nil { |
| 176 p.decoded[i] = pm |
| 177 if toSave != nil { |
| 178 data, err = mkItemData(pm) |
| 179 } |
| 180 } |
| 181 |
| 182 canSave := true |
| 183 if err != nil { |
| 184 p.lme.Assign(i, err) |
| 185 canSave = err == ds.ErrNoSuchEntity |
| 186 } |
| 187 |
| 188 if canSave && toSave != nil { |
| 189 m := ds.PropertyMap(nil) |
| 190 if p.toGetMeta != nil { |
| 191 m = p.toGetMeta[j] |
| 192 } |
| 193 expSecs := meta(m, CacheExpirationMeta, CacheTim
eSeconds).(int64) |
| 194 toSave.SetExpiration(time.Duration(expSecs) * ti
me.Second) |
| 195 toSave.SetValue(data) |
| 196 toCas = append(toCas, toSave) |
| 197 } |
| 198 |
| 199 j++ |
| 200 }) |
| 201 if err != nil { |
| 202 return err |
| 203 } |
| 204 if len(toCas) > 0 { |
| 205 if err := d.mc.CompareAndSwapMulti(toCas); err != nil { |
| 206 d.log.Errorf("dscache: CompareAndSwapMulti: %s",
err) |
| 207 } |
| 208 } |
| 209 } |
| 210 |
| 211 for i, dec := range p.decoded { |
| 212 cb(dec, p.lme.GetOne(i)) |
| 213 } |
| 214 |
| 215 return nil |
| 216 } |
| 217 |
| 218 func (d *dsCache) RunInTransaction(f func(context.Context) error, opts *ds.Trans
actionOptions) error { |
| 219 txnState := dsTxnState{} |
| 220 err := d.RawInterface.RunInTransaction(func(ctx context.Context) error { |
| 221 txnState.Reset() |
| 222 err := f(context.WithValue(ctx, dsTxnCacheKey, &txnState)) |
| 223 if err != nil { |
| 224 txnState.Apply(d.mc, d.log) |
| 225 } |
| 226 return err |
| 227 }, opts) |
| 228 if err == nil { |
| 229 txnState.Release(d.mc, d.log) |
| 230 } |
| 231 return err |
| 232 } |
OLD | NEW |