Index: tumble/tumble.go |
diff --git a/tumble/tumble.go b/tumble/tumble.go |
index f5e8f6924b55df6b440ecf477814cde22bfbeafd..00ddb331473d75a8d2d2bbe58dab0a028c6b405a 100644 |
--- a/tumble/tumble.go |
+++ b/tumble/tumble.go |
@@ -7,12 +7,14 @@ package tumble |
import ( |
"fmt" |
- "github.com/luci/gae/filter/txnBuf" |
- ds "github.com/luci/gae/service/datastore" |
"github.com/luci/luci-go/common/clock" |
"github.com/luci/luci-go/common/data/stringset" |
"github.com/luci/luci-go/common/errors" |
"github.com/luci/luci-go/common/logging" |
+ |
+ "github.com/luci/gae/filter/txnBuf" |
+ ds "github.com/luci/gae/service/datastore" |
+ |
"golang.org/x/net/context" |
) |
@@ -24,7 +26,7 @@ import ( |
// state machine as a result of some API interaction. |
func RunMutation(c context.Context, m Mutation) error { |
cfg := getConfig(c) |
- shardSet, _, _, err := enterTransactionInternal(txnBuf.FilterRDS(c), cfg, m, 0) |
+ shardSet, _, _, err := enterTransactionMutation(txnBuf.FilterRDS(c), cfg, m, 0) |
if err != nil { |
return err |
} |
@@ -32,11 +34,36 @@ func RunMutation(c context.Context, m Mutation) error { |
return nil |
} |
-func enterTransactionInternal(c context.Context, cfg *Config, m Mutation, round uint64) (map[taskShard]struct{}, []Mutation, []*ds.Key, error) { |
- fromRoot := m.Root(c) |
+// RunUnbuffered opens a lightweight unbuffered transaction on "root" |
+// runs "fn" inside of it. Any mutations returned by "fn" will be registered |
+// at the end of the transaction if "fn" doesn't return an error. |
+// |
+// This is useful as an initial starting point without incurring any of the |
+// overhead of spinning up a new buffered transaction. |
+// |
+// During "fn"'s execution, standard Tumble operations such as PutNamedMutation |
+// and CancelNamedMutation may be performed. |
+func RunUnbuffered(c context.Context, root *ds.Key, fn func(context.Context) ([]Mutation, error)) error { |
+ cfg := getConfig(c) |
+ shardSet, _, _, err := enterTransactionInternal(c, cfg, root, fn, 0) |
+ if err != nil { |
+ return err |
+ } |
+ fireTasks(c, cfg, shardSet) |
+ return nil |
+} |
+ |
+func enterTransactionMutation(c context.Context, cfg *Config, m Mutation, round uint64) ( |
+ map[taskShard]struct{}, []Mutation, []*ds.Key, error) { |
+ |
+ return enterTransactionInternal(c, cfg, m.Root(c), m.RollForward, round) |
+} |
+ |
+func enterTransactionInternal(c context.Context, cfg *Config, root *ds.Key, fn func(context.Context) ([]Mutation, error), round uint64) ( |
+ map[taskShard]struct{}, []Mutation, []*ds.Key, error) { |
- if fromRoot == nil { |
- return nil, nil, nil, fmt.Errorf("tumble: Passing nil as fromRoot is illegal") |
+ if root == nil { |
+ return nil, nil, nil, fmt.Errorf("tumble: Passing nil as root is illegal") |
} |
shardSet := map[taskShard]struct{}(nil) |
@@ -44,17 +71,17 @@ func enterTransactionInternal(c context.Context, cfg *Config, m Mutation, round |
retMutKeys := []*ds.Key(nil) |
err := ds.RunInTransaction(c, func(c context.Context) error { |
- // do a Get on the fromRoot to ensure that this transaction is associated |
+ // do a Get on the root to ensure that this transaction is associated |
// with that entity group. |
- _, _ = ds.Exists(c, fromRoot) |
+ _, _ = ds.Exists(c, root) |
- muts, err := m.RollForward(c) |
+ muts, err := fn(c) |
if err != nil { |
return err |
} |
retMuts = muts |
- shardSet, retMutKeys, err = putMutations(c, cfg, fromRoot, muts, round) |
+ shardSet, retMutKeys, err = putMutations(c, cfg, root, muts, round) |
return err |
}, nil) |