| Index: tumble/tumble.go
|
| diff --git a/tumble/tumble.go b/tumble/tumble.go
|
| index f5e8f6924b55df6b440ecf477814cde22bfbeafd..00ddb331473d75a8d2d2bbe58dab0a028c6b405a 100644
|
| --- a/tumble/tumble.go
|
| +++ b/tumble/tumble.go
|
| @@ -7,12 +7,14 @@ package tumble
|
| import (
|
| "fmt"
|
|
|
| - "github.com/luci/gae/filter/txnBuf"
|
| - ds "github.com/luci/gae/service/datastore"
|
| "github.com/luci/luci-go/common/clock"
|
| "github.com/luci/luci-go/common/data/stringset"
|
| "github.com/luci/luci-go/common/errors"
|
| "github.com/luci/luci-go/common/logging"
|
| +
|
| + "github.com/luci/gae/filter/txnBuf"
|
| + ds "github.com/luci/gae/service/datastore"
|
| +
|
| "golang.org/x/net/context"
|
| )
|
|
|
| @@ -24,7 +26,7 @@ import (
|
| // state machine as a result of some API interaction.
|
| func RunMutation(c context.Context, m Mutation) error {
|
| cfg := getConfig(c)
|
| - shardSet, _, _, err := enterTransactionInternal(txnBuf.FilterRDS(c), cfg, m, 0)
|
| + shardSet, _, _, err := enterTransactionMutation(txnBuf.FilterRDS(c), cfg, m, 0)
|
| if err != nil {
|
| return err
|
| }
|
| @@ -32,11 +34,36 @@ func RunMutation(c context.Context, m Mutation) error {
|
| return nil
|
| }
|
|
|
| -func enterTransactionInternal(c context.Context, cfg *Config, m Mutation, round uint64) (map[taskShard]struct{}, []Mutation, []*ds.Key, error) {
|
| - fromRoot := m.Root(c)
|
| +// RunUnbuffered opens a lightweight unbuffered transaction on "root"
|
| +// runs "fn" inside of it. Any mutations returned by "fn" will be registered
|
| +// at the end of the transaction if "fn" doesn't return an error.
|
| +//
|
| +// This is useful as an initial starting point without incurring any of the
|
| +// overhead of spinning up a new buffered transaction.
|
| +//
|
| +// During "fn"'s execution, standard Tumble operations such as PutNamedMutation
|
| +// and CancelNamedMutation may be performed.
|
| +func RunUnbuffered(c context.Context, root *ds.Key, fn func(context.Context) ([]Mutation, error)) error {
|
| + cfg := getConfig(c)
|
| + shardSet, _, _, err := enterTransactionInternal(c, cfg, root, fn, 0)
|
| + if err != nil {
|
| + return err
|
| + }
|
| + fireTasks(c, cfg, shardSet)
|
| + return nil
|
| +}
|
| +
|
| +func enterTransactionMutation(c context.Context, cfg *Config, m Mutation, round uint64) (
|
| + map[taskShard]struct{}, []Mutation, []*ds.Key, error) {
|
| +
|
| + return enterTransactionInternal(c, cfg, m.Root(c), m.RollForward, round)
|
| +}
|
| +
|
| +func enterTransactionInternal(c context.Context, cfg *Config, root *ds.Key, fn func(context.Context) ([]Mutation, error), round uint64) (
|
| + map[taskShard]struct{}, []Mutation, []*ds.Key, error) {
|
|
|
| - if fromRoot == nil {
|
| - return nil, nil, nil, fmt.Errorf("tumble: Passing nil as fromRoot is illegal")
|
| + if root == nil {
|
| + return nil, nil, nil, fmt.Errorf("tumble: Passing nil as root is illegal")
|
| }
|
|
|
| shardSet := map[taskShard]struct{}(nil)
|
| @@ -44,17 +71,17 @@ func enterTransactionInternal(c context.Context, cfg *Config, m Mutation, round
|
| retMutKeys := []*ds.Key(nil)
|
|
|
| err := ds.RunInTransaction(c, func(c context.Context) error {
|
| - // do a Get on the fromRoot to ensure that this transaction is associated
|
| + // do a Get on the root to ensure that this transaction is associated
|
| // with that entity group.
|
| - _, _ = ds.Exists(c, fromRoot)
|
| + _, _ = ds.Exists(c, root)
|
|
|
| - muts, err := m.RollForward(c)
|
| + muts, err := fn(c)
|
| if err != nil {
|
| return err
|
| }
|
|
|
| retMuts = muts
|
| - shardSet, retMutKeys, err = putMutations(c, cfg, fromRoot, muts, round)
|
| + shardSet, retMutKeys, err = putMutations(c, cfg, root, muts, round)
|
|
|
| return err
|
| }, nil)
|
|
|