Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1369)

Unified Diff: filter/txnBuf/ds_txn.go

Issue 1309803004: Add transaction buffer filter. (Closed) Base URL: https://github.com/luci/gae.git@add_query_support
Patch Set: one more test Created 5 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: filter/txnBuf/ds_txn.go
diff --git a/filter/txnBuf/ds_txn.go b/filter/txnBuf/ds_txn.go
new file mode 100644
index 0000000000000000000000000000000000000000..862329289bd202c84ec961931d89199ed7d93421
--- /dev/null
+++ b/filter/txnBuf/ds_txn.go
@@ -0,0 +1,170 @@
+// Copyright 2015 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package txnBuf
+
+import (
+ ds "github.com/luci/gae/service/datastore"
+ "github.com/luci/luci-go/common/errors"
+ "golang.org/x/net/context"
+)
+
+// ErrTransactionTooLarge is returned when applying an inner transaction would
+// cause an outer transaction to become too large.
+var ErrTransactionTooLarge = errors.New(
+ "applying the transaction would make the parent transaction too large")
+
+type dsTxnBuf struct {
+ ds.RawInterface
+
+ ic context.Context
+ state *txnBufState
+}
+
+var _ ds.RawInterface = (*dsTxnBuf)(nil)
+
+func (d *dsTxnBuf) GetMulti(keys []*ds.Key, metas ds.MultiMetaGetter, cb ds.GetMultiCB) error {
+ data, err := d.state.getMulti(keys)
+ if err != nil {
+ return err
+ }
+
+ idxMap := []int(nil)
+ getKeys := []*ds.Key(nil)
+ getMetas := ds.MultiMetaGetter(nil)
+ lme := errors.NewLazyMultiError(len(keys))
+
+ for i, itm := range data {
+ if !itm.buffered {
+ idxMap = append(idxMap, i)
+ getKeys = append(getKeys, itm.key)
+ getMetas = append(getMetas, metas.GetSingle(i))
+ }
+ }
+
+ if len(idxMap) > 0 {
+ j := 0
+ err := d.state.parentDS.GetMulti(getKeys, getMetas, func(pm ds.PropertyMap, err error) {
+ if err == ds.ErrNoSuchEntity {
+ return
+ }
+ i := idxMap[j]
+ if !lme.Assign(i, err) {
+ data[i].key = keys[j]
+ data[i].data = pm
+ }
+ j++
Vadim Sh. 2015/09/28 18:52:56 this should probably be before err == ds.ErrNoSuch
iannucci 2015/09/29 03:21:37 ah, yeah. I added a test for this too.
+ })
+ if err != nil {
+ return err
+ }
+ }
+
+ for i, itm := range data {
+ err := lme.GetOne(i)
+ if err != nil {
+ cb(nil, err)
+ } else if itm.data == nil {
+ cb(nil, ds.ErrNoSuchEntity)
+ } else {
+ cb(itm.data, nil)
+ }
+ }
+ return nil
Vadim Sh. 2015/09/28 18:52:56 is it ok to return nil here even if lme contains s
iannucci 2015/09/29 03:21:38 yeah, that's the convention. Basically either the
+}
+
+func (d *dsTxnBuf) PutMulti(keys []*ds.Key, vals []ds.PropertyMap, cb ds.PutMultiCB) error {
+ lme := errors.NewLazyMultiError(len(keys))
+ realKeys := []*ds.Key(nil)
+ for i, key := range keys {
+ if key.Incomplete() {
+ start, err := d.state.parentDS.AllocateIDs(key, 1)
Vadim Sh. 2015/09/28 18:52:56 in theory it should be possible to batch\paralleli
iannucci 2015/09/29 03:21:38 Thought of this, and yeah it does. However I'm pre
+ if !lme.Assign(i, err) {
+ if realKeys == nil {
+ realKeys = make([]*ds.Key, len(keys))
+ copy(realKeys, keys)
+ }
+
+ aid, ns, toks := key.Split()
+ toks[len(toks)-1].IntID = start
+ realKeys[i] = ds.NewKeyToks(aid, ns, toks)
+ }
+ }
+ }
+ if err := lme.Get(); err != nil {
+ for _, e := range err.(errors.MultiError) {
+ cb(nil, e)
+ }
+ return nil
+ }
+
+ if realKeys == nil {
+ realKeys = keys
+ }
+
+ err := d.state.putMulti(realKeys, vals)
+ if err != nil {
+ return err
Vadim Sh. 2015/09/28 18:52:56 is it ok not to call callback here?
iannucci 2015/09/29 03:21:38 yeah per the convention.
+ }
+
+ for _, k := range realKeys {
+ cb(k, nil)
+ }
+ return nil
+}
+
+func (d *dsTxnBuf) DeleteMulti(keys []*ds.Key, cb ds.DeleteMultiCB) error {
+ err := d.state.deleteMulti(keys)
Vadim Sh. 2015/09/28 18:52:56 what about deleting them from _real_ datastore? st
iannucci 2015/09/29 03:21:38 all _real_ datastore activity happens in the apply
Vadim Sh. 2015/09/29 03:27:26 Yes, I understand that. But if I open a new buffer
+ if err != nil {
+ return err
+ }
+
+ for range keys {
+ cb(nil)
+ }
+ return nil
+}
+
+func (d *dsTxnBuf) Run(fq *ds.FinalizedQuery, cb ds.RawRunCB) error {
+ if start, end := fq.Bounds(); start != nil || end != nil {
+ return errors.New("txnBuf filter does not support query cursors")
+ }
+
+ limit, limitSet := fq.Limit()
+ offset, _ := fq.Offset()
+ keysOnly := fq.KeysOnly()
+
+ project := fq.Project()
+ needSlimming := len(project) > 0
+
+ d.state.Lock()
+ defer d.state.Unlock()
+
+ return runMergedQueries(fq, d.state, func(key *ds.Key, data ds.PropertyMap) bool {
+ if offset > 0 {
+ offset--
+ return true
+ }
+ if limitSet {
+ if limit == 0 {
+ return false
+ }
+ limit--
+ }
+ if keysOnly {
+ data = nil
+ } else if needSlimming {
+ newData := make(ds.PropertyMap, len(project))
+ for _, p := range project {
+ newData[p] = data[p]
+ }
+ data = newData
+ }
+ return cb(key, data, nil)
+ })
+}
+
+func (d *dsTxnBuf) RunInTransaction(cb func(context.Context) error, opts *ds.TransactionOptions) error {
+ return withTxnBuf(d.ic, cb, opts)
+}

Powered by Google App Engine
This is Rietveld 408576698