Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(85)

Side by Side Diff: tumble/tumble.go

Issue 2592753002: Create unbuffered Tumble entry point for LogDog. (Closed)
Patch Set: Remove unrelated change. Created 3 years, 12 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « tumble/process.go ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package tumble 5 package tumble
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 9
10 "github.com/luci/gae/filter/txnBuf" 10 "github.com/luci/gae/filter/txnBuf"
11 ds "github.com/luci/gae/service/datastore" 11 ds "github.com/luci/gae/service/datastore"
12 "github.com/luci/luci-go/common/clock" 12 "github.com/luci/luci-go/common/clock"
13 "github.com/luci/luci-go/common/data/stringset" 13 "github.com/luci/luci-go/common/data/stringset"
14 "github.com/luci/luci-go/common/errors" 14 "github.com/luci/luci-go/common/errors"
15 "github.com/luci/luci-go/common/logging" 15 "github.com/luci/luci-go/common/logging"
16 "golang.org/x/net/context" 16 "golang.org/x/net/context"
17 ) 17 )
18 18
19 // RunMutation immediately runs the Mutation `m` in a transaction. This method 19 // RunMutation immediately runs the Mutation `m` in a transaction. This method
20 // should be used to start a tumble chain when you have transactional checks 20 // should be used to start a tumble chain when you have transactional checks
21 // to do (e.g. `m` implements the first transactional link in the chain). 21 // to do (e.g. `m` implements the first transactional link in the chain).
22 // 22 //
23 // Usually this is called from your application's handlers to begin a tumble 23 // Usually this is called from your application's handlers to begin a tumble
24 // state machine as a result of some API interaction. 24 // state machine as a result of some API interaction.
25 func RunMutation(c context.Context, m Mutation) error { 25 func RunMutation(c context.Context, m Mutation) error {
26 cfg := getConfig(c) 26 cfg := getConfig(c)
27 » shardSet, _, _, err := enterTransactionInternal(txnBuf.FilterRDS(c), cfg , m, 0) 27 » shardSet, _, _, err := enterTransactionMutation(txnBuf.FilterRDS(c), cfg , m, 0)
28 if err != nil { 28 if err != nil {
29 return err 29 return err
30 } 30 }
31 fireTasks(c, cfg, shardSet) 31 fireTasks(c, cfg, shardSet)
32 return nil 32 return nil
33 } 33 }
34 34
35 func enterTransactionInternal(c context.Context, cfg *Config, m Mutation, round uint64) (map[taskShard]struct{}, []Mutation, []*ds.Key, error) { 35 // RegisterInTransaction opens a lightweight unbuffered transaction on "root"
iannucci 2016/12/21 10:05:19 Why not 'RunUnbuffered'? There's no registration h
36 » fromRoot := m.Root(c) 36 // runs "fn" inside of it. Any mutations returned by "fn" will be registered
37 // at the end of the transaction if "fn" doesn't return an error.
38 //
39 // This is useful as an initial starting point without incurring all of the
40 // overhead of spinning up a new buffered transaction.
iannucci 2016/12/21 10:05:19 s/all of the//
41 //
42 // During "fn"'s execution, standard Tumble operations such as PutNamedMutation
43 // and CancelNamedMutation may be performed.
44 func RegisterInTransaction(c context.Context, root *ds.Key, fn func(context.Cont ext) ([]Mutation, error)) error {
45 » cfg := getConfig(c)
46 » shardSet, _, _, err := enterTransactionInternal(c, cfg, root, fn, 0)
47 » if err != nil {
48 » » return err
49 » }
50 » fireTasks(c, cfg, shardSet)
51 » return nil
52 }
37 53
38 » if fromRoot == nil { 54 func enterTransactionMutation(c context.Context, cfg *Config, m Mutation, round uint64) (
39 » » return nil, nil, nil, fmt.Errorf("tumble: Passing nil as fromRoo t is illegal") 55 » map[taskShard]struct{}, []Mutation, []*ds.Key, error) {
56
57 » return enterTransactionInternal(c, cfg, m.Root(c), m.RollForward, round)
58 }
59
60 func enterTransactionInternal(c context.Context, cfg *Config, root *ds.Key, fn f unc(context.Context) ([]Mutation, error), round uint64) (
61 » map[taskShard]struct{}, []Mutation, []*ds.Key, error) {
62
63 » if root == nil {
64 » » return nil, nil, nil, fmt.Errorf("tumble: Passing nil as root is illegal")
40 } 65 }
41 66
42 shardSet := map[taskShard]struct{}(nil) 67 shardSet := map[taskShard]struct{}(nil)
43 retMuts := []Mutation(nil) 68 retMuts := []Mutation(nil)
44 retMutKeys := []*ds.Key(nil) 69 retMutKeys := []*ds.Key(nil)
45 70
46 err := ds.RunInTransaction(c, func(c context.Context) error { 71 err := ds.RunInTransaction(c, func(c context.Context) error {
47 » » // do a Get on the fromRoot to ensure that this transaction is a ssociated 72 » » // do a Get on the root to ensure that this transaction is assoc iated
48 // with that entity group. 73 // with that entity group.
49 » » _, _ = ds.Exists(c, fromRoot) 74 » » _, _ = ds.Exists(c, root)
50 75
51 » » muts, err := m.RollForward(c) 76 » » muts, err := fn(c)
52 if err != nil { 77 if err != nil {
53 return err 78 return err
54 } 79 }
55 80
56 retMuts = muts 81 retMuts = muts
57 » » shardSet, retMutKeys, err = putMutations(c, cfg, fromRoot, muts, round) 82 » » shardSet, retMutKeys, err = putMutations(c, cfg, root, muts, rou nd)
58 83
59 return err 84 return err
60 }, nil) 85 }, nil)
61 if err != nil { 86 if err != nil {
62 return nil, nil, nil, err 87 return nil, nil, nil, err
63 } 88 }
64 89
65 return shardSet, retMuts, retMutKeys, nil 90 return shardSet, retMuts, retMutKeys, nil
66 } 91 }
67 92
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
112 // CancelNamedMutations does a best-effort cancellation of the named mutations. 137 // CancelNamedMutations does a best-effort cancellation of the named mutations.
113 func CancelNamedMutations(c context.Context, parent *ds.Key, names ...string) er ror { 138 func CancelNamedMutations(c context.Context, parent *ds.Key, names ...string) er ror {
114 toDel := make([]*ds.Key, 0, len(names)) 139 toDel := make([]*ds.Key, 0, len(names))
115 nameSet := stringset.NewFromSlice(names...) 140 nameSet := stringset.NewFromSlice(names...)
116 nameSet.Iter(func(name string) bool { 141 nameSet.Iter(func(name string) bool {
117 toDel = append(toDel, ds.NewKey(c, "tumble.Mutation", "n:"+name, 0, parent)) 142 toDel = append(toDel, ds.NewKey(c, "tumble.Mutation", "n:"+name, 0, parent))
118 return true 143 return true
119 }) 144 })
120 return errors.Filter(ds.Delete(c, toDel), ds.ErrNoSuchEntity) 145 return errors.Filter(ds.Delete(c, toDel), ds.ErrNoSuchEntity)
121 } 146 }
OLDNEW
« no previous file with comments | « tumble/process.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698