Index: filter/dscache/ds.go |
diff --git a/filter/dscache/ds.go b/filter/dscache/ds.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..1a3e685cb39f7bb1457a3a4f97c5f75c6a5ef22d |
--- /dev/null |
+++ b/filter/dscache/ds.go |
@@ -0,0 +1,232 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+package dscache |
+ |
+import ( |
+ "bytes" |
+ "fmt" |
+ "math/rand" |
+ "time" |
+ |
+ ds "github.com/luci/gae/service/datastore" |
+ "github.com/luci/gae/service/memcache" |
+ "github.com/luci/luci-go/common/errors" |
+ "github.com/luci/luci-go/common/logging" |
+ "golang.org/x/net/context" |
+) |
+ |
+type dsCache struct { |
+ ds.RawInterface |
+ |
+ aid string |
+ ns string |
+ |
+ mc memcache.Interface |
+ log logging.Logger |
+ mr *rand.Rand |
+} |
+ |
+var _ ds.RawInterface = (*dsCache)(nil) |
+ |
+func (d *dsCache) mutation(keys []ds.Key, f func() error) error { |
+ lockItems, lockKeys := mkAllLockItems(d.mc, keys) |
+ if lockItems == nil { |
+ return f() |
+ } |
+ if err := d.mc.SetMulti(lockItems); err != nil { |
+ d.log.Errorf("dscache: mc.SetMulti: %s", err) |
+ } |
+ err := f() |
+ if err == nil { |
+ if err := d.mc.DeleteMulti(lockKeys); err != nil { |
+ d.log.Errorf("dscache: mc.DeleteMulti: %s", err) |
+ } |
+ } |
+ return err |
+} |
+ |
+func (d *dsCache) DeleteMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { |
+ return d.mutation(keys, func() error { |
+ return d.RawInterface.DeleteMulti(keys, cb) |
+ }) |
+} |
+ |
+func (d *dsCache) PutMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.PutMultiCB) error { |
+ return d.mutation(keys, func() error { |
+ return d.RawInterface.PutMulti(keys, vals, cb) |
+ }) |
+} |
+ |
+type facts struct { |
+ getKeys []ds.Key |
+ getMeta []ds.PropertyMap |
+ lockItems []memcache.Item |
+} |
+ |
+type plan struct { |
+ keepMeta bool |
+ |
+ idxMap []int |
+ toGet []ds.Key |
+ toGetMeta []ds.PropertyMap |
+ toSave []memcache.Item |
+ |
+ decoded []ds.PropertyMap |
+ lme errors.LazyMultiError |
+} |
+ |
+func (p *plan) add(idx int, get ds.Key, m ds.PropertyMap, save memcache.Item) { |
+ p.idxMap = append(p.idxMap, idx) |
+ p.toGet = append(p.toGet, get) |
+ |
+ if save != nil { |
+ save.SetFlags(uint32(ItemHasData)) |
+ } |
+ p.toSave = append(p.toSave, save) |
+ |
+ if p.keepMeta { |
+ p.toGetMeta = append(p.toGetMeta, m) |
+ } |
+} |
+ |
+func (p *plan) empty() bool { |
+ return len(p.idxMap) == 0 |
+} |
+ |
+func (d *dsCache) makePlan(f *facts) *plan { |
+ // get index -> items index |
+ p := plan{ |
+ keepMeta: f.getMeta != nil, |
+ decoded: make([]ds.PropertyMap, len(f.lockItems)), |
+ lme: errors.LazyMultiError{Size: len(f.lockItems)}, |
+ } |
+ for i, lockItm := range f.lockItems { |
+ curItm := f.lockItems[i] |
+ m := ds.PropertyMap(nil) |
+ if f.getMeta != nil { |
+ m = f.getMeta[i] |
+ } |
+ |
+ getKey := f.getKeys[i] |
+ if !meta(m, CacheEnableMeta, true).(bool) { |
+ fmt.Println("cache is disabled!") |
+ p.add(i, getKey, m, nil) |
+ continue |
+ } |
+ |
+ if curItm == nil { |
+ fmt.Println("missing item in memcache!") |
+ p.add(i, getKey, m, nil) |
+ continue |
+ } |
+ |
+ flg := FlagValue(curItm.Flags()) |
+ if flg == ItemHasLock { |
+ if !bytes.Equal(curItm.Value(), lockItm.Value()) { |
+ curItm = nil // someone else has the lock, don't save |
+ } |
+ fmt.Println("found lock item in memcache!") |
+ p.add(i, getKey, m, curItm) |
+ continue |
+ } |
+ |
+ pmap, err := decodeValue(curItm.Value(), d.aid, d.ns) |
+ switch err { |
+ case nil: |
+ p.decoded[i] = pmap |
+ case ds.ErrNoSuchEntity: |
+ p.lme.Assign(i, ds.ErrNoSuchEntity) |
+ default: |
+ d.log.Errorf("dscache: error decoding %s, %s: %s", curItm.Key(), getKey, err) |
+ fmt.Println("failed to decode!") |
+ p.add(i, getKey, m, curItm) |
+ } |
+ } |
+ return &p |
+} |
+ |
+func (d *dsCache) GetMulti(keys []ds.Key, metas []ds.PropertyMap, cb ds.GetMultiCB) error { |
+ lockItems := mkRandLockItems(d.mc, d.mr, keys) |
+ if lockItems == nil { |
+ return d.RawInterface.GetMulti(keys, metas, cb) |
+ } |
+ |
+ if err := d.mc.AddMulti(lockItems); err != nil { |
+ d.log.Errorf("dscache: mc.AddMulti: %s", err) |
+ } |
+ if err := d.mc.GetMulti(lockItems); err != nil { |
+ d.log.Errorf("dscache: mc.GetMulti: %s", err) |
+ } |
+ |
+ p := d.makePlan(&facts{keys, metas, lockItems}) |
+ |
+ if !p.empty() { |
+ fmt.Println("have to touch DS :(", p) |
+ toCas := []memcache.Item{} |
+ j := 0 |
+ err := d.RawInterface.GetMulti(p.toGet, p.toGetMeta, func(pm ds.PropertyMap, err error) { |
+ i := p.idxMap[j] |
+ toSave := p.toSave[j] |
+ |
+ data := []byte(nil) |
+ |
+ if err == nil { |
+ p.decoded[i] = pm |
+ if toSave != nil { |
+ data, err = mkItemData(pm) |
+ } |
+ } |
+ |
+ canSave := true |
+ if err != nil { |
+ p.lme.Assign(i, err) |
+ canSave = err == ds.ErrNoSuchEntity |
+ } |
+ |
+ if canSave && toSave != nil { |
+ m := ds.PropertyMap(nil) |
+ if p.toGetMeta != nil { |
+ m = p.toGetMeta[j] |
+ } |
+ expSecs := meta(m, CacheExpirationMeta, CacheTimeSeconds).(int64) |
+ toSave.SetExpiration(time.Duration(expSecs) * time.Second) |
+ toSave.SetValue(data) |
+ toCas = append(toCas, toSave) |
+ } |
+ |
+ j++ |
+ }) |
+ if err != nil { |
+ return err |
+ } |
+ if len(toCas) > 0 { |
+ if err := d.mc.CompareAndSwapMulti(toCas); err != nil { |
+ d.log.Errorf("dscache: CompareAndSwapMulti: %s", err) |
+ } |
+ } |
+ } |
+ |
+ for i, dec := range p.decoded { |
+ cb(dec, p.lme.GetOne(i)) |
+ } |
+ |
+ return nil |
+} |
+ |
+func (d *dsCache) RunInTransaction(f func(context.Context) error, opts *ds.TransactionOptions) error { |
+ txnState := dsTxnState{} |
+ err := d.RawInterface.RunInTransaction(func(ctx context.Context) error { |
+ txnState.Reset() |
+ err := f(context.WithValue(ctx, dsTxnCacheKey, &txnState)) |
+ if err != nil { |
+ txnState.Apply(d.mc, d.log) |
+ } |
+ return err |
+ }, opts) |
+ if err == nil { |
+ txnState.Release(d.mc, d.log) |
+ } |
+ return err |
+} |