| Index: tumble/tumble.go
|
| diff --git a/tumble/tumble.go b/tumble/tumble.go
|
| index f5e8f6924b55df6b440ecf477814cde22bfbeafd..0818ce2d305bfc5cb73ce656036fbd6e0e773583 100644
|
| --- a/tumble/tumble.go
|
| +++ b/tumble/tumble.go
|
| @@ -24,7 +24,7 @@ import (
|
| // state machine as a result of some API interaction.
|
| func RunMutation(c context.Context, m Mutation) error {
|
| cfg := getConfig(c)
|
| - shardSet, _, _, err := enterTransactionInternal(txnBuf.FilterRDS(c), cfg, m, 0)
|
| + shardSet, _, _, err := enterTransactionMutation(txnBuf.FilterRDS(c), cfg, m, 0)
|
| if err != nil {
|
| return err
|
| }
|
| @@ -32,11 +32,36 @@ func RunMutation(c context.Context, m Mutation) error {
|
| return nil
|
| }
|
|
|
| -func enterTransactionInternal(c context.Context, cfg *Config, m Mutation, round uint64) (map[taskShard]struct{}, []Mutation, []*ds.Key, error) {
|
| - fromRoot := m.Root(c)
|
| +// RegisterInTransaction opens a lightweight unbuffered transaction on "root"
|
| +// runs "fn" inside of it. Any mutations returned by "fn" will be registered
|
| +// at the end of the transaction if "fn" doesn't return an error.
|
| +//
|
| +// This is useful as an initial starting point without incurring all of the
|
| +// overhead of spinning up a new buffered transaction.
|
| +//
|
| +// During "fn"'s execution, standard Tumble operations such as PutNamedMutation
|
| +// and CancelNamedMutation may be performed.
|
| +func RegisterInTransaction(c context.Context, root *ds.Key, fn func(context.Context) ([]Mutation, error)) error {
|
| + cfg := getConfig(c)
|
| + shardSet, _, _, err := enterTransactionInternal(c, cfg, root, fn, 0)
|
| + if err != nil {
|
| + return err
|
| + }
|
| + fireTasks(c, cfg, shardSet)
|
| + return nil
|
| +}
|
| +
|
| +func enterTransactionMutation(c context.Context, cfg *Config, m Mutation, round uint64) (
|
| + map[taskShard]struct{}, []Mutation, []*ds.Key, error) {
|
| +
|
| + return enterTransactionInternal(c, cfg, m.Root(c), m.RollForward, round)
|
| +}
|
| +
|
| +func enterTransactionInternal(c context.Context, cfg *Config, root *ds.Key, fn func(context.Context) ([]Mutation, error), round uint64) (
|
| + map[taskShard]struct{}, []Mutation, []*ds.Key, error) {
|
|
|
| - if fromRoot == nil {
|
| - return nil, nil, nil, fmt.Errorf("tumble: Passing nil as fromRoot is illegal")
|
| + if root == nil {
|
| + return nil, nil, nil, fmt.Errorf("tumble: Passing nil as root is illegal")
|
| }
|
|
|
| shardSet := map[taskShard]struct{}(nil)
|
| @@ -44,17 +69,17 @@ func enterTransactionInternal(c context.Context, cfg *Config, m Mutation, round
|
| retMutKeys := []*ds.Key(nil)
|
|
|
| err := ds.RunInTransaction(c, func(c context.Context) error {
|
| - // do a Get on the fromRoot to ensure that this transaction is associated
|
| + // do a Get on the root to ensure that this transaction is associated
|
| // with that entity group.
|
| - _, _ = ds.Exists(c, fromRoot)
|
| + _, _ = ds.Exists(c, root)
|
|
|
| - muts, err := m.RollForward(c)
|
| + muts, err := fn(c)
|
| if err != nil {
|
| return err
|
| }
|
|
|
| retMuts = muts
|
| - shardSet, retMutKeys, err = putMutations(c, cfg, fromRoot, muts, round)
|
| + shardSet, retMutKeys, err = putMutations(c, cfg, root, muts, round)
|
|
|
| return err
|
| }, nil)
|
|
|