| Index: filter/dscache/ds.go | 
| diff --git a/filter/dscache/ds.go b/filter/dscache/ds.go | 
| new file mode 100644 | 
| index 0000000000000000000000000000000000000000..1a3e685cb39f7bb1457a3a4f97c5f75c6a5ef22d | 
| --- /dev/null | 
| +++ b/filter/dscache/ds.go | 
| @@ -0,0 +1,232 @@ | 
| +// Copyright 2015 The Chromium Authors. All rights reserved. | 
| +// Use of this source code is governed by a BSD-style license that can be | 
| +// found in the LICENSE file. | 
| + | 
| +package dscache | 
| + | 
| +import ( | 
| +	"bytes" | 
| +	"fmt" | 
| +	"math/rand" | 
| +	"time" | 
| + | 
| +	ds "github.com/luci/gae/service/datastore" | 
| +	"github.com/luci/gae/service/memcache" | 
| +	"github.com/luci/luci-go/common/errors" | 
| +	"github.com/luci/luci-go/common/logging" | 
| +	"golang.org/x/net/context" | 
| +) | 
| + | 
| +type dsCache struct { | 
| +	ds.RawInterface | 
| + | 
| +	aid string | 
| +	ns  string | 
| + | 
| +	mc  memcache.Interface | 
| +	log logging.Logger | 
| +	mr  *rand.Rand | 
| +} | 
| + | 
| +var _ ds.RawInterface = (*dsCache)(nil) | 
| + | 
| +func (d *dsCache) mutation(keys []ds.Key, f func() error) error { | 
| +	lockItems, lockKeys := mkAllLockItems(d.mc, keys) | 
| +	if lockItems == nil { | 
| +		return f() | 
| +	} | 
| +	if err := d.mc.SetMulti(lockItems); err != nil { | 
| +		d.log.Errorf("dscache: mc.SetMulti: %s", err) | 
| +	} | 
| +	err := f() | 
| +	if err == nil { | 
| +		if err := d.mc.DeleteMulti(lockKeys); err != nil { | 
| +			d.log.Errorf("dscache: mc.DeleteMulti: %s", err) | 
| +		} | 
| +	} | 
| +	return err | 
| +} | 
| + | 
| +func (d *dsCache) DeleteMulti(keys []ds.Key, cb ds.DeleteMultiCB) error { | 
| +	return d.mutation(keys, func() error { | 
| +		return d.RawInterface.DeleteMulti(keys, cb) | 
| +	}) | 
| +} | 
| + | 
| +func (d *dsCache) PutMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.PutMultiCB) error { | 
| +	return d.mutation(keys, func() error { | 
| +		return d.RawInterface.PutMulti(keys, vals, cb) | 
| +	}) | 
| +} | 
| + | 
| +type facts struct { | 
| +	getKeys   []ds.Key | 
| +	getMeta   []ds.PropertyMap | 
| +	lockItems []memcache.Item | 
| +} | 
| + | 
| +type plan struct { | 
| +	keepMeta bool | 
| + | 
| +	idxMap    []int | 
| +	toGet     []ds.Key | 
| +	toGetMeta []ds.PropertyMap | 
| +	toSave    []memcache.Item | 
| + | 
| +	decoded []ds.PropertyMap | 
| +	lme     errors.LazyMultiError | 
| +} | 
| + | 
| +func (p *plan) add(idx int, get ds.Key, m ds.PropertyMap, save memcache.Item) { | 
| +	p.idxMap = append(p.idxMap, idx) | 
| +	p.toGet = append(p.toGet, get) | 
| + | 
| +	if save != nil { | 
| +		save.SetFlags(uint32(ItemHasData)) | 
| +	} | 
| +	p.toSave = append(p.toSave, save) | 
| + | 
| +	if p.keepMeta { | 
| +		p.toGetMeta = append(p.toGetMeta, m) | 
| +	} | 
| +} | 
| + | 
| +func (p *plan) empty() bool { | 
| +	return len(p.idxMap) == 0 | 
| +} | 
| + | 
| +func (d *dsCache) makePlan(f *facts) *plan { | 
| +	// get index -> items index | 
| +	p := plan{ | 
| +		keepMeta: f.getMeta != nil, | 
| +		decoded:  make([]ds.PropertyMap, len(f.lockItems)), | 
| +		lme:      errors.LazyMultiError{Size: len(f.lockItems)}, | 
| +	} | 
| +	for i, lockItm := range f.lockItems { | 
| +		curItm := f.lockItems[i] | 
| +		m := ds.PropertyMap(nil) | 
| +		if f.getMeta != nil { | 
| +			m = f.getMeta[i] | 
| +		} | 
| + | 
| +		getKey := f.getKeys[i] | 
| +		if !meta(m, CacheEnableMeta, true).(bool) { | 
| +			fmt.Println("cache is disabled!") | 
| +			p.add(i, getKey, m, nil) | 
| +			continue | 
| +		} | 
| + | 
| +		if curItm == nil { | 
| +			fmt.Println("missing item in memcache!") | 
| +			p.add(i, getKey, m, nil) | 
| +			continue | 
| +		} | 
| + | 
| +		flg := FlagValue(curItm.Flags()) | 
| +		if flg == ItemHasLock { | 
| +			if !bytes.Equal(curItm.Value(), lockItm.Value()) { | 
| +				curItm = nil // someone else has the lock, don't save | 
| +			} | 
| +			fmt.Println("found lock item in memcache!") | 
| +			p.add(i, getKey, m, curItm) | 
| +			continue | 
| +		} | 
| + | 
| +		pmap, err := decodeValue(curItm.Value(), d.aid, d.ns) | 
| +		switch err { | 
| +		case nil: | 
| +			p.decoded[i] = pmap | 
| +		case ds.ErrNoSuchEntity: | 
| +			p.lme.Assign(i, ds.ErrNoSuchEntity) | 
| +		default: | 
| +			d.log.Errorf("dscache: error decoding %s, %s: %s", curItm.Key(), getKey, err) | 
| +			fmt.Println("failed to decode!") | 
| +			p.add(i, getKey, m, curItm) | 
| +		} | 
| +	} | 
| +	return &p | 
| +} | 
| + | 
| +func (d *dsCache) GetMulti(keys []ds.Key, metas []ds.PropertyMap, cb ds.GetMultiCB) error { | 
| +	lockItems := mkRandLockItems(d.mc, d.mr, keys) | 
| +	if lockItems == nil { | 
| +		return d.RawInterface.GetMulti(keys, metas, cb) | 
| +	} | 
| + | 
| +	if err := d.mc.AddMulti(lockItems); err != nil { | 
| +		d.log.Errorf("dscache: mc.AddMulti: %s", err) | 
| +	} | 
| +	if err := d.mc.GetMulti(lockItems); err != nil { | 
| +		d.log.Errorf("dscache: mc.GetMulti: %s", err) | 
| +	} | 
| + | 
| +	p := d.makePlan(&facts{keys, metas, lockItems}) | 
| + | 
| +	if !p.empty() { | 
| +		fmt.Println("have to touch DS :(", p) | 
| +		toCas := []memcache.Item{} | 
| +		j := 0 | 
| +		err := d.RawInterface.GetMulti(p.toGet, p.toGetMeta, func(pm ds.PropertyMap, err error) { | 
| +			i := p.idxMap[j] | 
| +			toSave := p.toSave[j] | 
| + | 
| +			data := []byte(nil) | 
| + | 
| +			if err == nil { | 
| +				p.decoded[i] = pm | 
| +				if toSave != nil { | 
| +					data, err = mkItemData(pm) | 
| +				} | 
| +			} | 
| + | 
| +			canSave := true | 
| +			if err != nil { | 
| +				p.lme.Assign(i, err) | 
| +				canSave = err == ds.ErrNoSuchEntity | 
| +			} | 
| + | 
| +			if canSave && toSave != nil { | 
| +				m := ds.PropertyMap(nil) | 
| +				if p.toGetMeta != nil { | 
| +					m = p.toGetMeta[j] | 
| +				} | 
| +				expSecs := meta(m, CacheExpirationMeta, CacheTimeSeconds).(int64) | 
| +				toSave.SetExpiration(time.Duration(expSecs) * time.Second) | 
| +				toSave.SetValue(data) | 
| +				toCas = append(toCas, toSave) | 
| +			} | 
| + | 
| +			j++ | 
| +		}) | 
| +		if err != nil { | 
| +			return err | 
| +		} | 
| +		if len(toCas) > 0 { | 
| +			if err := d.mc.CompareAndSwapMulti(toCas); err != nil { | 
| +				d.log.Errorf("dscache: CompareAndSwapMulti: %s", err) | 
| +			} | 
| +		} | 
| +	} | 
| + | 
| +	for i, dec := range p.decoded { | 
| +		cb(dec, p.lme.GetOne(i)) | 
| +	} | 
| + | 
| +	return nil | 
| +} | 
| + | 
| +func (d *dsCache) RunInTransaction(f func(context.Context) error, opts *ds.TransactionOptions) error { | 
| +	txnState := dsTxnState{} | 
| +	err := d.RawInterface.RunInTransaction(func(ctx context.Context) error { | 
| +		txnState.Reset() | 
| +		err := f(context.WithValue(ctx, dsTxnCacheKey, &txnState)) | 
| +		if err != nil { | 
| +			txnState.Apply(d.mc, d.log) | 
| +		} | 
| +		return err | 
| +	}, opts) | 
| +	if err == nil { | 
| +		txnState.Release(d.mc, d.log) | 
| +	} | 
| +	return err | 
| +} | 
|  |