OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package tumble | 5 package tumble |
6 | 6 |
7 import ( | 7 import ( |
8 "fmt" | 8 "fmt" |
9 | 9 |
10 "github.com/luci/gae/filter/txnBuf" | |
11 ds "github.com/luci/gae/service/datastore" | |
12 "github.com/luci/luci-go/common/clock" | 10 "github.com/luci/luci-go/common/clock" |
13 "github.com/luci/luci-go/common/data/stringset" | 11 "github.com/luci/luci-go/common/data/stringset" |
14 "github.com/luci/luci-go/common/errors" | 12 "github.com/luci/luci-go/common/errors" |
15 "github.com/luci/luci-go/common/logging" | 13 "github.com/luci/luci-go/common/logging" |
| 14 |
| 15 "github.com/luci/gae/filter/txnBuf" |
| 16 ds "github.com/luci/gae/service/datastore" |
| 17 |
16 "golang.org/x/net/context" | 18 "golang.org/x/net/context" |
17 ) | 19 ) |
18 | 20 |
19 // RunMutation immediately runs the Mutation `m` in a transaction. This method | 21 // RunMutation immediately runs the Mutation `m` in a transaction. This method |
20 // should be used to start a tumble chain when you have transactional checks | 22 // should be used to start a tumble chain when you have transactional checks |
21 // to do (e.g. `m` implements the first transactional link in the chain). | 23 // to do (e.g. `m` implements the first transactional link in the chain). |
22 // | 24 // |
23 // Usually this is called from your application's handlers to begin a tumble | 25 // Usually this is called from your application's handlers to begin a tumble |
24 // state machine as a result of some API interaction. | 26 // state machine as a result of some API interaction. |
25 func RunMutation(c context.Context, m Mutation) error { | 27 func RunMutation(c context.Context, m Mutation) error { |
26 cfg := getConfig(c) | 28 cfg := getConfig(c) |
27 » shardSet, _, _, err := enterTransactionInternal(txnBuf.FilterRDS(c), cfg
, m, 0) | 29 » shardSet, _, _, err := enterTransactionMutation(txnBuf.FilterRDS(c), cfg
, m, 0) |
28 if err != nil { | 30 if err != nil { |
29 return err | 31 return err |
30 } | 32 } |
31 fireTasks(c, cfg, shardSet) | 33 fireTasks(c, cfg, shardSet) |
32 return nil | 34 return nil |
33 } | 35 } |
34 | 36 |
35 func enterTransactionInternal(c context.Context, cfg *Config, m Mutation, round
uint64) (map[taskShard]struct{}, []Mutation, []*ds.Key, error) { | 37 // RunUnbuffered opens a lightweight unbuffered transaction on "root" |
36 » fromRoot := m.Root(c) | 38 // runs "fn" inside of it. Any mutations returned by "fn" will be registered |
| 39 // at the end of the transaction if "fn" doesn't return an error. |
| 40 // |
| 41 // This is useful as an initial starting point without incurring any of the |
| 42 // overhead of spinning up a new buffered transaction. |
| 43 // |
| 44 // During "fn"'s execution, standard Tumble operations such as PutNamedMutation |
| 45 // and CancelNamedMutation may be performed. |
| 46 func RunUnbuffered(c context.Context, root *ds.Key, fn func(context.Context) ([]
Mutation, error)) error { |
| 47 » cfg := getConfig(c) |
| 48 » shardSet, _, _, err := enterTransactionInternal(c, cfg, root, fn, 0) |
| 49 » if err != nil { |
| 50 » » return err |
| 51 » } |
| 52 » fireTasks(c, cfg, shardSet) |
| 53 » return nil |
| 54 } |
37 | 55 |
38 » if fromRoot == nil { | 56 func enterTransactionMutation(c context.Context, cfg *Config, m Mutation, round
uint64) ( |
39 » » return nil, nil, nil, fmt.Errorf("tumble: Passing nil as fromRoo
t is illegal") | 57 » map[taskShard]struct{}, []Mutation, []*ds.Key, error) { |
| 58 |
| 59 » return enterTransactionInternal(c, cfg, m.Root(c), m.RollForward, round) |
| 60 } |
| 61 |
| 62 func enterTransactionInternal(c context.Context, cfg *Config, root *ds.Key, fn f
unc(context.Context) ([]Mutation, error), round uint64) ( |
| 63 » map[taskShard]struct{}, []Mutation, []*ds.Key, error) { |
| 64 |
| 65 » if root == nil { |
| 66 » » return nil, nil, nil, fmt.Errorf("tumble: Passing nil as root is
illegal") |
40 } | 67 } |
41 | 68 |
42 shardSet := map[taskShard]struct{}(nil) | 69 shardSet := map[taskShard]struct{}(nil) |
43 retMuts := []Mutation(nil) | 70 retMuts := []Mutation(nil) |
44 retMutKeys := []*ds.Key(nil) | 71 retMutKeys := []*ds.Key(nil) |
45 | 72 |
46 err := ds.RunInTransaction(c, func(c context.Context) error { | 73 err := ds.RunInTransaction(c, func(c context.Context) error { |
47 » » // do a Get on the fromRoot to ensure that this transaction is a
ssociated | 74 » » // do a Get on the root to ensure that this transaction is assoc
iated |
48 // with that entity group. | 75 // with that entity group. |
49 » » _, _ = ds.Exists(c, fromRoot) | 76 » » _, _ = ds.Exists(c, root) |
50 | 77 |
51 » » muts, err := m.RollForward(c) | 78 » » muts, err := fn(c) |
52 if err != nil { | 79 if err != nil { |
53 return err | 80 return err |
54 } | 81 } |
55 | 82 |
56 retMuts = muts | 83 retMuts = muts |
57 » » shardSet, retMutKeys, err = putMutations(c, cfg, fromRoot, muts,
round) | 84 » » shardSet, retMutKeys, err = putMutations(c, cfg, root, muts, rou
nd) |
58 | 85 |
59 return err | 86 return err |
60 }, nil) | 87 }, nil) |
61 if err != nil { | 88 if err != nil { |
62 return nil, nil, nil, err | 89 return nil, nil, nil, err |
63 } | 90 } |
64 | 91 |
65 return shardSet, retMuts, retMutKeys, nil | 92 return shardSet, retMuts, retMutKeys, nil |
66 } | 93 } |
67 | 94 |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
112 // CancelNamedMutations does a best-effort cancellation of the named mutations. | 139 // CancelNamedMutations does a best-effort cancellation of the named mutations. |
113 func CancelNamedMutations(c context.Context, parent *ds.Key, names ...string) er
ror { | 140 func CancelNamedMutations(c context.Context, parent *ds.Key, names ...string) er
ror { |
114 toDel := make([]*ds.Key, 0, len(names)) | 141 toDel := make([]*ds.Key, 0, len(names)) |
115 nameSet := stringset.NewFromSlice(names...) | 142 nameSet := stringset.NewFromSlice(names...) |
116 nameSet.Iter(func(name string) bool { | 143 nameSet.Iter(func(name string) bool { |
117 toDel = append(toDel, ds.NewKey(c, "tumble.Mutation", "n:"+name,
0, parent)) | 144 toDel = append(toDel, ds.NewKey(c, "tumble.Mutation", "n:"+name,
0, parent)) |
118 return true | 145 return true |
119 }) | 146 }) |
120 return errors.Filter(ds.Delete(c, toDel), ds.ErrNoSuchEntity) | 147 return errors.Filter(ds.Delete(c, toDel), ds.ErrNoSuchEntity) |
121 } | 148 } |
OLD | NEW |