| Index: filter/dscache/ds.go
|
| diff --git a/filter/dscache/ds.go b/filter/dscache/ds.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..1a3e685cb39f7bb1457a3a4f97c5f75c6a5ef22d
|
| --- /dev/null
|
| +++ b/filter/dscache/ds.go
|
| @@ -0,0 +1,232 @@
|
| +// Copyright 2015 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package dscache
|
| +
|
| +import (
|
| + "bytes"
|
| + "fmt"
|
| + "math/rand"
|
| + "time"
|
| +
|
| + ds "github.com/luci/gae/service/datastore"
|
| + "github.com/luci/gae/service/memcache"
|
| + "github.com/luci/luci-go/common/errors"
|
| + "github.com/luci/luci-go/common/logging"
|
| + "golang.org/x/net/context"
|
| +)
|
| +
|
| +type dsCache struct {
|
| + ds.RawInterface
|
| +
|
| + aid string
|
| + ns string
|
| +
|
| + mc memcache.Interface
|
| + log logging.Logger
|
| + mr *rand.Rand
|
| +}
|
| +
|
| +var _ ds.RawInterface = (*dsCache)(nil)
|
| +
|
| +func (d *dsCache) mutation(keys []ds.Key, f func() error) error {
|
| + lockItems, lockKeys := mkAllLockItems(d.mc, keys)
|
| + if lockItems == nil {
|
| + return f()
|
| + }
|
| + if err := d.mc.SetMulti(lockItems); err != nil {
|
| + d.log.Errorf("dscache: mc.SetMulti: %s", err)
|
| + }
|
| + err := f()
|
| + if err == nil {
|
| + if err := d.mc.DeleteMulti(lockKeys); err != nil {
|
| + d.log.Errorf("dscache: mc.DeleteMulti: %s", err)
|
| + }
|
| + }
|
| + return err
|
| +}
|
| +
|
| +func (d *dsCache) DeleteMulti(keys []ds.Key, cb ds.DeleteMultiCB) error {
|
| + return d.mutation(keys, func() error {
|
| + return d.RawInterface.DeleteMulti(keys, cb)
|
| + })
|
| +}
|
| +
|
| +func (d *dsCache) PutMulti(keys []ds.Key, vals []ds.PropertyMap, cb ds.PutMultiCB) error {
|
| + return d.mutation(keys, func() error {
|
| + return d.RawInterface.PutMulti(keys, vals, cb)
|
| + })
|
| +}
|
| +
|
| +type facts struct {
|
| + getKeys []ds.Key
|
| + getMeta []ds.PropertyMap
|
| + lockItems []memcache.Item
|
| +}
|
| +
|
| +type plan struct {
|
| + keepMeta bool
|
| +
|
| + idxMap []int
|
| + toGet []ds.Key
|
| + toGetMeta []ds.PropertyMap
|
| + toSave []memcache.Item
|
| +
|
| + decoded []ds.PropertyMap
|
| + lme errors.LazyMultiError
|
| +}
|
| +
|
| +func (p *plan) add(idx int, get ds.Key, m ds.PropertyMap, save memcache.Item) {
|
| + p.idxMap = append(p.idxMap, idx)
|
| + p.toGet = append(p.toGet, get)
|
| +
|
| + if save != nil {
|
| + save.SetFlags(uint32(ItemHasData))
|
| + }
|
| + p.toSave = append(p.toSave, save)
|
| +
|
| + if p.keepMeta {
|
| + p.toGetMeta = append(p.toGetMeta, m)
|
| + }
|
| +}
|
| +
|
| +func (p *plan) empty() bool {
|
| + return len(p.idxMap) == 0
|
| +}
|
| +
|
| +func (d *dsCache) makePlan(f *facts) *plan {
|
| + // get index -> items index
|
| + p := plan{
|
| + keepMeta: f.getMeta != nil,
|
| + decoded: make([]ds.PropertyMap, len(f.lockItems)),
|
| + lme: errors.LazyMultiError{Size: len(f.lockItems)},
|
| + }
|
| + for i, lockItm := range f.lockItems {
|
| + curItm := f.lockItems[i]
|
| + m := ds.PropertyMap(nil)
|
| + if f.getMeta != nil {
|
| + m = f.getMeta[i]
|
| + }
|
| +
|
| + getKey := f.getKeys[i]
|
| + if !meta(m, CacheEnableMeta, true).(bool) {
|
| + fmt.Println("cache is disabled!")
|
| + p.add(i, getKey, m, nil)
|
| + continue
|
| + }
|
| +
|
| + if curItm == nil {
|
| + fmt.Println("missing item in memcache!")
|
| + p.add(i, getKey, m, nil)
|
| + continue
|
| + }
|
| +
|
| + flg := FlagValue(curItm.Flags())
|
| + if flg == ItemHasLock {
|
| + if !bytes.Equal(curItm.Value(), lockItm.Value()) {
|
| + curItm = nil // someone else has the lock, don't save
|
| + }
|
| + fmt.Println("found lock item in memcache!")
|
| + p.add(i, getKey, m, curItm)
|
| + continue
|
| + }
|
| +
|
| + pmap, err := decodeValue(curItm.Value(), d.aid, d.ns)
|
| + switch err {
|
| + case nil:
|
| + p.decoded[i] = pmap
|
| + case ds.ErrNoSuchEntity:
|
| + p.lme.Assign(i, ds.ErrNoSuchEntity)
|
| + default:
|
| + d.log.Errorf("dscache: error decoding %s, %s: %s", curItm.Key(), getKey, err)
|
| + fmt.Println("failed to decode!")
|
| + p.add(i, getKey, m, curItm)
|
| + }
|
| + }
|
| + return &p
|
| +}
|
| +
|
| +func (d *dsCache) GetMulti(keys []ds.Key, metas []ds.PropertyMap, cb ds.GetMultiCB) error {
|
| + lockItems := mkRandLockItems(d.mc, d.mr, keys)
|
| + if lockItems == nil {
|
| + return d.RawInterface.GetMulti(keys, metas, cb)
|
| + }
|
| +
|
| + if err := d.mc.AddMulti(lockItems); err != nil {
|
| + d.log.Errorf("dscache: mc.AddMulti: %s", err)
|
| + }
|
| + if err := d.mc.GetMulti(lockItems); err != nil {
|
| + d.log.Errorf("dscache: mc.GetMulti: %s", err)
|
| + }
|
| +
|
| + p := d.makePlan(&facts{keys, metas, lockItems})
|
| +
|
| + if !p.empty() {
|
| + fmt.Println("have to touch DS :(", p)
|
| + toCas := []memcache.Item{}
|
| + j := 0
|
| + err := d.RawInterface.GetMulti(p.toGet, p.toGetMeta, func(pm ds.PropertyMap, err error) {
|
| + i := p.idxMap[j]
|
| + toSave := p.toSave[j]
|
| +
|
| + data := []byte(nil)
|
| +
|
| + if err == nil {
|
| + p.decoded[i] = pm
|
| + if toSave != nil {
|
| + data, err = mkItemData(pm)
|
| + }
|
| + }
|
| +
|
| + canSave := true
|
| + if err != nil {
|
| + p.lme.Assign(i, err)
|
| + canSave = err == ds.ErrNoSuchEntity
|
| + }
|
| +
|
| + if canSave && toSave != nil {
|
| + m := ds.PropertyMap(nil)
|
| + if p.toGetMeta != nil {
|
| + m = p.toGetMeta[j]
|
| + }
|
| + expSecs := meta(m, CacheExpirationMeta, CacheTimeSeconds).(int64)
|
| + toSave.SetExpiration(time.Duration(expSecs) * time.Second)
|
| + toSave.SetValue(data)
|
| + toCas = append(toCas, toSave)
|
| + }
|
| +
|
| + j++
|
| + })
|
| + if err != nil {
|
| + return err
|
| + }
|
| + if len(toCas) > 0 {
|
| + if err := d.mc.CompareAndSwapMulti(toCas); err != nil {
|
| + d.log.Errorf("dscache: CompareAndSwapMulti: %s", err)
|
| + }
|
| + }
|
| + }
|
| +
|
| + for i, dec := range p.decoded {
|
| + cb(dec, p.lme.GetOne(i))
|
| + }
|
| +
|
| + return nil
|
| +}
|
| +
|
| +func (d *dsCache) RunInTransaction(f func(context.Context) error, opts *ds.TransactionOptions) error {
|
| + txnState := dsTxnState{}
|
| + err := d.RawInterface.RunInTransaction(func(ctx context.Context) error {
|
| + txnState.Reset()
|
| + err := f(context.WithValue(ctx, dsTxnCacheKey, &txnState))
|
| + if err != nil {
|
| + txnState.Apply(d.mc, d.log)
|
| + }
|
| + return err
|
| + }, opts)
|
| + if err == nil {
|
| + txnState.Release(d.mc, d.log)
|
| + }
|
| + return err
|
| +}
|
|
|