| Index: filter/dscache/ds.go
|
| diff --git a/filter/dscache/ds.go b/filter/dscache/ds.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..a1b659ec6fd480a47fdbeecbc82d522c4714fe05
|
| --- /dev/null
|
| +++ b/filter/dscache/ds.go
|
| @@ -0,0 +1,139 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package dscache
|
| +
|
| +import (
|
| + "time"
|
| +
|
| + ds "github.com/luci/gae/service/datastore"
|
| + "github.com/luci/gae/service/memcache"
|
| + log "github.com/luci/luci-go/common/logging"
|
| + "golang.org/x/net/context"
|
| +)
|
| +
|
| +type dsCache struct {
|
| + ds.RawInterface
|
| +
|
| + *supportContext
|
| +}
|
| +
|
| +var _ ds.RawInterface = (*dsCache)(nil)
|
| +
|
| +func (d *dsCache) DeleteMulti(keys []ds.Key, cb ds.DeleteMultiCB) error {
|
| + return d.mutation(keys, func() error {
|
| + return d.RawInterface.DeleteMulti(keys, cb)
|
| + })
|
| +}
|
| +
|
| +func (d *dsCache) PutMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.PutMultiCB) error {
|
| + return d.mutation(keys, func() error {
|
| + return d.RawInterface.PutMulti(keys, vals, cb)
|
| + })
|
| +}
|
| +
|
| +func (d *dsCache) GetMulti(keys []ds.Key, metas ds.MultiMetaGetter, cb ds.GetMultiCB) error {
|
| + lockItems, nonce := d.mkRandLockItems(keys, metas)
|
| + if len(lockItems) == 0 {
|
| + return d.RawInterface.GetMulti(keys, metas, cb)
|
| + }
|
| +
|
| + if err := d.mc.AddMulti(lockItems); err != nil {
|
| + (log.Fields{log.ErrorKey: err}).Warningf(
|
| + d.c, "dscache: GetMulti: memcache.AddMulti")
|
| +
|
| + }
|
| + if err := d.mc.GetMulti(lockItems); err != nil {
|
| + (log.Fields{log.ErrorKey: err}).Warningf(
|
| + d.c, "dscache: GetMulti: memcache.GetMulti")
|
| + }
|
| +
|
| + p := makeFetchPlan(d.c, d.aid, d.ns, &facts{keys, metas, lockItems, nonce})
|
| +
|
| + if !p.empty() {
|
| + // looks like we have something to pull from datastore, and maybe some work
|
| + // to save stuff back to memcache.
|
| +
|
| + toCas := []memcache.Item{}
|
| + j := 0
|
| + err := d.RawInterface.GetMulti(p.toGet, p.toGetMeta, func(pm ds.PropertyMap, err error) {
|
| + i := p.idxMap[j]
|
| + toSave := p.toSave[j]
|
| + j++
|
| +
|
| + data := []byte(nil)
|
| +
|
| + // true: save entity to memcache
|
| + // false: lock entity in memcache forever
|
| + shouldSave := true
|
| + if err == nil {
|
| + p.decoded[i] = pm
|
| + if toSave != nil {
|
| + data = encodeItemValue(pm)
|
| + if len(data) > internalValueSizeLimit {
|
| + shouldSave = false
|
| + log.Warningf(
|
| + d.c, "dscache: encoded entity too big (%d/%d)!",
|
| + len(data), internalValueSizeLimit)
|
| + }
|
| + }
|
| + } else {
|
| + p.lme.Assign(i, err)
|
| + if err != ds.ErrNoSuchEntity {
|
| + return // aka continue to the next entry
|
| + }
|
| + }
|
| +
|
| + if toSave != nil {
|
| + if shouldSave { // save
|
| + expSecs := metas.GetMetaDefault(i, CacheExpirationMeta, CacheTimeSeconds).(int64)
|
| + toSave.SetFlags(uint32(ItemHasData))
|
| + toSave.SetExpiration(time.Duration(expSecs) * time.Second)
|
| + toSave.SetValue(data)
|
| + } else {
|
| + // Set a lock with an infinite timeout. No one else should try to
|
| + // serialize this item to memcache until something Put/Delete's it.
|
| + toSave.SetFlags(uint32(ItemHasLock))
|
| + toSave.SetExpiration(0)
|
| + toSave.SetValue(nil)
|
| + }
|
| + toCas = append(toCas, toSave)
|
| + }
|
| + })
|
| + if err != nil {
|
| + return err
|
| + }
|
| + if len(toCas) > 0 {
|
| + // we have entries to save back to memcache.
|
| + if err := d.mc.CompareAndSwapMulti(toCas); err != nil {
|
| + (log.Fields{log.ErrorKey: err}).Warningf(
|
| + d.c, "dscache: GetMulti: memcache.CompareAndSwapMulti")
|
| + }
|
| + }
|
| + }
|
| +
|
| + // finally, run the callback for all of the decoded items and the errors,
|
| + // if any.
|
| + for i, dec := range p.decoded {
|
| + cb(dec, p.lme.GetOne(i))
|
| + }
|
| +
|
| + return nil
|
| +}
|
| +
|
| +func (d *dsCache) RunInTransaction(f func(context.Context) error, opts *ds.TransactionOptions) error {
|
| + txnState := dsTxnState{}
|
| + err := d.RawInterface.RunInTransaction(func(ctx context.Context) error {
|
| + txnState.reset()
|
| + err := f(context.WithValue(ctx, dsTxnCacheKey, &txnState))
|
| + if err == nil {
|
| + err = txnState.apply(d.supportContext)
|
| + }
|
| + return err
|
| + }, opts)
|
| + if err == nil {
|
| + txnState.release(d.supportContext)
|
| + }
|
| + return err
|
| +}
|
|
|