| Index: filter/dscache/ds.go
 | 
| diff --git a/filter/dscache/ds.go b/filter/dscache/ds.go
 | 
| new file mode 100644
 | 
| index 0000000000000000000000000000000000000000..a1b659ec6fd480a47fdbeecbc82d522c4714fe05
 | 
| --- /dev/null
 | 
| +++ b/filter/dscache/ds.go
 | 
| @@ -0,0 +1,139 @@
 | 
| +// Copyright 2015 The Chromium Authors. All rights reserved.
 | 
| +// Use of this source code is governed by a BSD-style license that can be
 | 
| +// found in the LICENSE file.
 | 
| +
 | 
| +package dscache
 | 
| +
 | 
| +import (
 | 
| +	"time"
 | 
| +
 | 
| +	ds "github.com/luci/gae/service/datastore"
 | 
| +	"github.com/luci/gae/service/memcache"
 | 
| +	log "github.com/luci/luci-go/common/logging"
 | 
| +	"golang.org/x/net/context"
 | 
| +)
 | 
| +
 | 
| +type dsCache struct {
 | 
| +	ds.RawInterface
 | 
| +
 | 
| +	*supportContext
 | 
| +}
 | 
| +
 | 
| +var _ ds.RawInterface = (*dsCache)(nil)
 | 
| +
 | 
| +func (d *dsCache) DeleteMulti(keys []ds.Key, cb ds.DeleteMultiCB) error {
 | 
| +	return d.mutation(keys, func() error {
 | 
| +		return d.RawInterface.DeleteMulti(keys, cb)
 | 
| +	})
 | 
| +}
 | 
| +
 | 
| +func (d *dsCache) PutMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.PutMultiCB) error {
 | 
| +	return d.mutation(keys, func() error {
 | 
| +		return d.RawInterface.PutMulti(keys, vals, cb)
 | 
| +	})
 | 
| +}
 | 
| +
 | 
| +func (d *dsCache) GetMulti(keys []ds.Key, metas ds.MultiMetaGetter, cb ds.GetMultiCB) error {
 | 
| +	lockItems, nonce := d.mkRandLockItems(keys, metas)
 | 
| +	if len(lockItems) == 0 {
 | 
| +		return d.RawInterface.GetMulti(keys, metas, cb)
 | 
| +	}
 | 
| +
 | 
| +	if err := d.mc.AddMulti(lockItems); err != nil {
 | 
| +		(log.Fields{log.ErrorKey: err}).Warningf(
 | 
| +			d.c, "dscache: GetMulti: memcache.AddMulti")
 | 
| +
 | 
| +	}
 | 
| +	if err := d.mc.GetMulti(lockItems); err != nil {
 | 
| +		(log.Fields{log.ErrorKey: err}).Warningf(
 | 
| +			d.c, "dscache: GetMulti: memcache.GetMulti")
 | 
| +	}
 | 
| +
 | 
| +	p := makeFetchPlan(d.c, d.aid, d.ns, &facts{keys, metas, lockItems, nonce})
 | 
| +
 | 
| +	if !p.empty() {
 | 
| +		// looks like we have something to pull from datastore, and maybe some work
 | 
| +		// to save stuff back to memcache.
 | 
| +
 | 
| +		toCas := []memcache.Item{}
 | 
| +		j := 0
 | 
| +		err := d.RawInterface.GetMulti(p.toGet, p.toGetMeta, func(pm ds.PropertyMap, err error) {
 | 
| +			i := p.idxMap[j]
 | 
| +			toSave := p.toSave[j]
 | 
| +			j++
 | 
| +
 | 
| +			data := []byte(nil)
 | 
| +
 | 
| +			// true: save entity to memcache
 | 
| +			// false: lock entity in memcache forever
 | 
| +			shouldSave := true
 | 
| +			if err == nil {
 | 
| +				p.decoded[i] = pm
 | 
| +				if toSave != nil {
 | 
| +					data = encodeItemValue(pm)
 | 
| +					if len(data) > internalValueSizeLimit {
 | 
| +						shouldSave = false
 | 
| +						log.Warningf(
 | 
| +							d.c, "dscache: encoded entity too big (%d/%d)!",
 | 
| +							len(data), internalValueSizeLimit)
 | 
| +					}
 | 
| +				}
 | 
| +			} else {
 | 
| +				p.lme.Assign(i, err)
 | 
| +				if err != ds.ErrNoSuchEntity {
 | 
| +					return // aka continue to the next entry
 | 
| +				}
 | 
| +			}
 | 
| +
 | 
| +			if toSave != nil {
 | 
| +				if shouldSave { // save
 | 
| +					expSecs := metas.GetMetaDefault(i, CacheExpirationMeta, CacheTimeSeconds).(int64)
 | 
| +					toSave.SetFlags(uint32(ItemHasData))
 | 
| +					toSave.SetExpiration(time.Duration(expSecs) * time.Second)
 | 
| +					toSave.SetValue(data)
 | 
| +				} else {
 | 
| +					// Set a lock with an infinite timeout. No one else should try to
 | 
| +					// serialize this item to memcache until something Put/Delete's it.
 | 
| +					toSave.SetFlags(uint32(ItemHasLock))
 | 
| +					toSave.SetExpiration(0)
 | 
| +					toSave.SetValue(nil)
 | 
| +				}
 | 
| +				toCas = append(toCas, toSave)
 | 
| +			}
 | 
| +		})
 | 
| +		if err != nil {
 | 
| +			return err
 | 
| +		}
 | 
| +		if len(toCas) > 0 {
 | 
| +			// we have entries to save back to memcache.
 | 
| +			if err := d.mc.CompareAndSwapMulti(toCas); err != nil {
 | 
| +				(log.Fields{log.ErrorKey: err}).Warningf(
 | 
| +					d.c, "dscache: GetMulti: memcache.CompareAndSwapMulti")
 | 
| +			}
 | 
| +		}
 | 
| +	}
 | 
| +
 | 
| +	// finally, run the callback for all of the decoded items and the errors,
 | 
| +	// if any.
 | 
| +	for i, dec := range p.decoded {
 | 
| +		cb(dec, p.lme.GetOne(i))
 | 
| +	}
 | 
| +
 | 
| +	return nil
 | 
| +}
 | 
| +
 | 
| +func (d *dsCache) RunInTransaction(f func(context.Context) error, opts *ds.TransactionOptions) error {
 | 
| +	txnState := dsTxnState{}
 | 
| +	err := d.RawInterface.RunInTransaction(func(ctx context.Context) error {
 | 
| +		txnState.reset()
 | 
| +		err := f(context.WithValue(ctx, dsTxnCacheKey, &txnState))
 | 
| +		if err == nil {
 | 
| +			err = txnState.apply(d.supportContext)
 | 
| +		}
 | 
| +		return err
 | 
| +	}, opts)
 | 
| +	if err == nil {
 | 
| +		txnState.release(d.supportContext)
 | 
| +	}
 | 
| +	return err
 | 
| +}
 | 
| 
 |