Chromium Code Reviews| Index: filter/dscache/ds.go |
| diff --git a/filter/dscache/ds.go b/filter/dscache/ds.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..ee3d98b80c3d0ac2292c8d4d28a1c2bffffa7d8d |
| --- /dev/null |
| +++ b/filter/dscache/ds.go |
| @@ -0,0 +1,131 @@ |
| +// Copyright 2015 The Chromium Authors. All rights reserved. |
| +// Use of this source code is governed by a BSD-style license that can be |
| +// found in the LICENSE file. |
| + |
| +package dscache |
| + |
| +import ( |
| + "time" |
| + |
| + ds "github.com/luci/gae/service/datastore" |
| + "github.com/luci/gae/service/memcache" |
| + "golang.org/x/net/context" |
| +) |
| + |
| +type dsCache struct { |
| + ds.RawInterface |
| + |
| + *supportContext |
| +} |
| + |
| +var _ ds.RawInterface = (*dsCache)(nil) |
| + |
| +func (d *dsCache) DeleteMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { |
| + return d.mutation(keys, func() error { |
| + return d.RawInterface.DeleteMulti(keys, cb) |
| + }) |
| +} |
| + |
| +func (d *dsCache) PutMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.PutMultiCB) error { |
| + return d.mutation(keys, func() error { |
| + return d.RawInterface.PutMulti(keys, vals, cb) |
| + }) |
| +} |
| + |
| +func (d *dsCache) GetMulti(keys []ds.Key, metas ds.MultiMetaGetter, cb ds.GetMultiCB) error { |
| + lockItems, nonce := d.mkRandLockItems(keys, metas) |
| + if len(lockItems) == 0 { |
| + return d.RawInterface.GetMulti(keys, metas, cb) |
| + } |
| + |
| + if err := d.mc.AddMulti(lockItems); err != nil { |
|
Vadim Sh.
2015/08/06 01:23:33
what happens if only one item is failed to lock? (
iannucci
2015/08/06 02:37:33
it's not atomic, but it doesn't matter. Either:
|
| + d.log.Errorf("dscache: mc.AddMulti: %s", err) |
|
dnj
2015/08/05 18:32:17
(Debugf)
iannucci
2015/08/06 01:54:01
Did warn
|
| + } |
| + if err := d.mc.GetMulti(lockItems); err != nil { |
| + d.log.Errorf("dscache: mc.GetMulti: %s", err) |
|
dnj
2015/08/05 18:32:17
(Debugf)
iannucci
2015/08/06 01:54:01
Did warn
|
| + } |
| + |
| + p := makePlan(d.aid, d.ns, d.log, &facts{keys, metas, lockItems, nonce}) |
| + |
| + if !p.empty() { |
|
dnj
2015/08/05 18:32:17
Invert: if p.empty() { return nil }
iannucci
2015/08/06 01:54:01
can't, this modifies p.decoded for the loop below
dnj
2015/08/07 16:30:06
Acknowledged.
|
| + toCas := []memcache.Item{} |
|
dnj
2015/08/05 18:32:17
Might as well optimistically allocate capacity.
iannucci
2015/08/06 01:54:01
Eh... estimating the amount to allocate is hard en
dnj
2015/08/07 16:30:06
Acknowledged.
|
| + j := 0 |
| + err := d.RawInterface.GetMulti(p.toGet, p.toGetMeta, func(pm ds.PropertyMap, err error) { |
| + i := p.idxMap[j] |
| + toSave := p.toSave[j] |
|
Vadim Sh.
2015/08/06 01:23:33
ah, I guess appending nils is by design. Looked we
Vadim Sh.
2015/08/06 01:23:33
nit: move j++ to this line. I forgot what "j" was
|
| + |
| + data := []byte(nil) |
| + |
| + // 0 == do not save |
| + // 1 == save |
| + // 2 == lock until next put |
| + saveState := 1 |
| + if err == nil { |
| + p.decoded[i] = pm |
| + if toSave != nil { |
| + data = encodeItemValue(pm) |
| + if len(data) > internalValueSizeLimit { |
| + saveState = 2 |
|
Vadim Sh.
2015/08/06 01:23:33
heh... smart :)
iannucci
2015/08/06 02:37:33
managed to reduce this to just a boolean, but yeah
|
| + d.log.Warningf("dscache: encoded entity too big (%d/%d)!", |
| + len(data), internalValueSizeLimit) |
| + } |
| + } |
| + } |
| + |
| + if err != nil { |
|
dnj
2015/08/05 18:32:17
} else {
iannucci
2015/08/06 01:54:01
Done.
|
| + p.lme.Assign(i, err) |
| + if err != ds.ErrNoSuchEntity { |
| + saveState = 0 |
| + } |
| + } |
| + |
| + if saveState > 0 && toSave != nil { |
| + if saveState == 1 { // save |
| + expSecs := metas.GetMetaDefault(i, CacheExpirationMeta, CacheTimeSeconds).(int64) |
| + toSave.SetFlags(uint32(ItemHasData)) |
| + toSave.SetExpiration(time.Duration(expSecs) * time.Second) |
| + toSave.SetValue(data) |
| + } else { |
| + // Set a lock with an infinite timeout. No one else should try to |
| + // serialize this item to memcache until something Put/Delete's it. |
| + toSave.SetFlags(uint32(ItemHasLock)) |
| + toSave.SetExpiration(0) |
| + toSave.SetValue(nonce) |
| + } |
| + toCas = append(toCas, toSave) |
| + } |
| + |
| + j++ |
| + }) |
| + if err != nil { |
| + return err |
| + } |
| + if len(toCas) > 0 { |
| + if err := d.mc.CompareAndSwapMulti(toCas); err != nil { |
| + d.log.Errorf("dscache: CompareAndSwapMulti: %s", err) |
| + } |
| + } |
| + } |
| + |
| + for i, dec := range p.decoded { |
| + cb(dec, p.lme.GetOne(i)) |
| + } |
| + |
| + return nil |
| +} |
| + |
| +func (d *dsCache) RunInTransaction(f func(context.Context) error, opts *ds.TransactionOptions) error { |
| + txnState := dsTxnState{} |
| + err := d.RawInterface.RunInTransaction(func(ctx context.Context) error { |
| + txnState.Reset() |
| + err := f(context.WithValue(ctx, dsTxnCacheKey, &txnState)) |
| + if err == nil { |
| + txnState.Apply(d.supportContext) |
| + } |
| + return err |
| + }, opts) |
| + if err == nil { |
| + txnState.Release(d.supportContext) |
| + } |
| + return err |
| +} |