Index: filter/dscache/ds.go |
diff --git a/filter/dscache/ds.go b/filter/dscache/ds.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..ee3d98b80c3d0ac2292c8d4d28a1c2bffffa7d8d |
--- /dev/null |
+++ b/filter/dscache/ds.go |
@@ -0,0 +1,131 @@ |
+// Copyright 2015 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+package dscache |
+ |
+import ( |
+ "time" |
+ |
+ ds "github.com/luci/gae/service/datastore" |
+ "github.com/luci/gae/service/memcache" |
+ "golang.org/x/net/context" |
+) |
+ |
+type dsCache struct { |
+ ds.RawInterface |
+ |
+ *supportContext |
+} |
+ |
+var _ ds.RawInterface = (*dsCache)(nil) |
+ |
+func (d *dsCache) DeleteMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { |
+ return d.mutation(keys, func() error { |
+ return d.RawInterface.DeleteMulti(keys, cb) |
+ }) |
+} |
+ |
+func (d *dsCache) PutMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.PutMultiCB) error { |
+ return d.mutation(keys, func() error { |
+ return d.RawInterface.PutMulti(keys, vals, cb) |
+ }) |
+} |
+ |
+func (d *dsCache) GetMulti(keys []ds.Key, metas ds.MultiMetaGetter, cb ds.GetMultiCB) error { |
+ lockItems, nonce := d.mkRandLockItems(keys, metas) |
+ if len(lockItems) == 0 { |
+ return d.RawInterface.GetMulti(keys, metas, cb) |
+ } |
+ |
+ if err := d.mc.AddMulti(lockItems); err != nil { |
Vadim Sh.
2015/08/06 01:23:33
what happens if only one item is failed to lock? (
iannucci
2015/08/06 02:37:33
it's not atomic, but it doesn't matter. Either:
|
+ d.log.Errorf("dscache: mc.AddMulti: %s", err) |
dnj
2015/08/05 18:32:17
(Debugf)
iannucci
2015/08/06 01:54:01
Did warn
|
+ } |
+ if err := d.mc.GetMulti(lockItems); err != nil { |
+ d.log.Errorf("dscache: mc.GetMulti: %s", err) |
dnj
2015/08/05 18:32:17
(Debugf)
iannucci
2015/08/06 01:54:01
Did warn
|
+ } |
+ |
+ p := makePlan(d.aid, d.ns, d.log, &facts{keys, metas, lockItems, nonce}) |
+ |
+ if !p.empty() { |
dnj
2015/08/05 18:32:17
Invert: if p.empty() { return nil }
iannucci
2015/08/06 01:54:01
can't, this modifies p.decoded for the loop below
dnj
2015/08/07 16:30:06
Acknowledged.
|
+ toCas := []memcache.Item{} |
dnj
2015/08/05 18:32:17
Might as well optimistically allocate capacity.
iannucci
2015/08/06 01:54:01
Eh... estimating the amount to allocate is hard en
dnj
2015/08/07 16:30:06
Acknowledged.
|
+ j := 0 |
+ err := d.RawInterface.GetMulti(p.toGet, p.toGetMeta, func(pm ds.PropertyMap, err error) { |
+ i := p.idxMap[j] |
+ toSave := p.toSave[j] |
Vadim Sh.
2015/08/06 01:23:33
ah, I guess appending nils is by design. Looked we
Vadim Sh.
2015/08/06 01:23:33
nit: move j++ to this line. I forgot what "j" was
|
+ |
+ data := []byte(nil) |
+ |
+ // 0 == do not save |
+ // 1 == save |
+ // 2 == lock until next put |
+ saveState := 1 |
+ if err == nil { |
+ p.decoded[i] = pm |
+ if toSave != nil { |
+ data = encodeItemValue(pm) |
+ if len(data) > internalValueSizeLimit { |
+ saveState = 2 |
Vadim Sh.
2015/08/06 01:23:33
heh... smart :)
iannucci
2015/08/06 02:37:33
managed to reduce this to just a boolean, but yeah
|
+ d.log.Warningf("dscache: encoded entity too big (%d/%d)!", |
+ len(data), internalValueSizeLimit) |
+ } |
+ } |
+ } |
+ |
+ if err != nil { |
dnj
2015/08/05 18:32:17
} else {
iannucci
2015/08/06 01:54:01
Done.
|
+ p.lme.Assign(i, err) |
+ if err != ds.ErrNoSuchEntity { |
+ saveState = 0 |
+ } |
+ } |
+ |
+ if saveState > 0 && toSave != nil { |
+ if saveState == 1 { // save |
+ expSecs := metas.GetMetaDefault(i, CacheExpirationMeta, CacheTimeSeconds).(int64) |
+ toSave.SetFlags(uint32(ItemHasData)) |
+ toSave.SetExpiration(time.Duration(expSecs) * time.Second) |
+ toSave.SetValue(data) |
+ } else { |
+ // Set a lock with an infinite timeout. No one else should try to |
+ // serialize this item to memcache until something Put/Delete's it. |
+ toSave.SetFlags(uint32(ItemHasLock)) |
+ toSave.SetExpiration(0) |
+ toSave.SetValue(nonce) |
+ } |
+ toCas = append(toCas, toSave) |
+ } |
+ |
+ j++ |
+ }) |
+ if err != nil { |
+ return err |
+ } |
+ if len(toCas) > 0 { |
+ if err := d.mc.CompareAndSwapMulti(toCas); err != nil { |
+ d.log.Errorf("dscache: CompareAndSwapMulti: %s", err) |
+ } |
+ } |
+ } |
+ |
+ for i, dec := range p.decoded { |
+ cb(dec, p.lme.GetOne(i)) |
+ } |
+ |
+ return nil |
+} |
+ |
+func (d *dsCache) RunInTransaction(f func(context.Context) error, opts *ds.TransactionOptions) error { |
+ txnState := dsTxnState{} |
+ err := d.RawInterface.RunInTransaction(func(ctx context.Context) error { |
+ txnState.Reset() |
+ err := f(context.WithValue(ctx, dsTxnCacheKey, &txnState)) |
+ if err == nil { |
+ txnState.Apply(d.supportContext) |
+ } |
+ return err |
+ }, opts) |
+ if err == nil { |
+ txnState.Release(d.supportContext) |
+ } |
+ return err |
+} |