OLD | NEW |
---|---|
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package tumble | 5 package tumble |
6 | 6 |
7 import ( | 7 import ( |
8 "fmt" | 8 "fmt" |
9 | 9 |
10 "github.com/luci/gae/filter/txnBuf" | 10 "github.com/luci/gae/filter/txnBuf" |
11 ds "github.com/luci/gae/service/datastore" | 11 ds "github.com/luci/gae/service/datastore" |
12 "github.com/luci/luci-go/common/clock" | 12 "github.com/luci/luci-go/common/clock" |
13 "github.com/luci/luci-go/common/data/stringset" | 13 "github.com/luci/luci-go/common/data/stringset" |
14 "github.com/luci/luci-go/common/errors" | 14 "github.com/luci/luci-go/common/errors" |
15 "github.com/luci/luci-go/common/logging" | 15 "github.com/luci/luci-go/common/logging" |
16 "golang.org/x/net/context" | 16 "golang.org/x/net/context" |
17 ) | 17 ) |
18 | 18 |
19 // RunMutation immediately runs the Mutation `m` in a transaction. This method | 19 // RunMutation immediately runs the Mutation `m` in a transaction. This method |
20 // should be used to start a tumble chain when you have transactional checks | 20 // should be used to start a tumble chain when you have transactional checks |
21 // to do (e.g. `m` implements the first transactional link in the chain). | 21 // to do (e.g. `m` implements the first transactional link in the chain). |
22 // | 22 // |
23 // Usually this is called from your application's handlers to begin a tumble | 23 // Usually this is called from your application's handlers to begin a tumble |
24 // state machine as a result of some API interaction. | 24 // state machine as a result of some API interaction. |
25 func RunMutation(c context.Context, m Mutation) error { | 25 func RunMutation(c context.Context, m Mutation) error { |
26 cfg := getConfig(c) | 26 cfg := getConfig(c) |
27 » shardSet, _, _, err := enterTransactionInternal(txnBuf.FilterRDS(c), cfg , m, 0) | 27 » shardSet, _, _, err := enterTransactionMutation(txnBuf.FilterRDS(c), cfg , m, 0) |
28 if err != nil { | 28 if err != nil { |
29 return err | 29 return err |
30 } | 30 } |
31 fireTasks(c, cfg, shardSet) | 31 fireTasks(c, cfg, shardSet) |
32 return nil | 32 return nil |
33 } | 33 } |
34 | 34 |
35 func enterTransactionInternal(c context.Context, cfg *Config, m Mutation, round uint64) (map[taskShard]struct{}, []Mutation, []*ds.Key, error) { | 35 // RegisterInTransaction opens a lightweight unbuffered transaction on "root" |
iannucci
2016/12/21 10:05:19
Why not 'RunUnbuffered'? There's no registration h
| |
36 » fromRoot := m.Root(c) | 36 // runs "fn" inside of it. Any mutations returned by "fn" will be registered |
37 // at the end of the transaction if "fn" doesn't return an error. | |
38 // | |
39 // This is useful as an initial starting point without incurring all of the | |
40 // overhead of spinning up a new buffered transaction. | |
iannucci
2016/12/21 10:05:19
s/all of the//
| |
41 // | |
42 // During "fn"'s execution, standard Tumble operations such as PutNamedMutation | |
43 // and CancelNamedMutation may be performed. | |
44 func RegisterInTransaction(c context.Context, root *ds.Key, fn func(context.Cont ext) ([]Mutation, error)) error { | |
45 » cfg := getConfig(c) | |
46 » shardSet, _, _, err := enterTransactionInternal(c, cfg, root, fn, 0) | |
47 » if err != nil { | |
48 » » return err | |
49 » } | |
50 » fireTasks(c, cfg, shardSet) | |
51 » return nil | |
52 } | |
37 | 53 |
38 » if fromRoot == nil { | 54 func enterTransactionMutation(c context.Context, cfg *Config, m Mutation, round uint64) ( |
39 » » return nil, nil, nil, fmt.Errorf("tumble: Passing nil as fromRoo t is illegal") | 55 » map[taskShard]struct{}, []Mutation, []*ds.Key, error) { |
56 | |
57 » return enterTransactionInternal(c, cfg, m.Root(c), m.RollForward, round) | |
58 } | |
59 | |
60 func enterTransactionInternal(c context.Context, cfg *Config, root *ds.Key, fn f unc(context.Context) ([]Mutation, error), round uint64) ( | |
61 » map[taskShard]struct{}, []Mutation, []*ds.Key, error) { | |
62 | |
63 » if root == nil { | |
64 » » return nil, nil, nil, fmt.Errorf("tumble: Passing nil as root is illegal") | |
40 } | 65 } |
41 | 66 |
42 shardSet := map[taskShard]struct{}(nil) | 67 shardSet := map[taskShard]struct{}(nil) |
43 retMuts := []Mutation(nil) | 68 retMuts := []Mutation(nil) |
44 retMutKeys := []*ds.Key(nil) | 69 retMutKeys := []*ds.Key(nil) |
45 | 70 |
46 err := ds.RunInTransaction(c, func(c context.Context) error { | 71 err := ds.RunInTransaction(c, func(c context.Context) error { |
47 » » // do a Get on the fromRoot to ensure that this transaction is a ssociated | 72 » » // do a Get on the root to ensure that this transaction is assoc iated |
48 // with that entity group. | 73 // with that entity group. |
49 » » _, _ = ds.Exists(c, fromRoot) | 74 » » _, _ = ds.Exists(c, root) |
50 | 75 |
51 » » muts, err := m.RollForward(c) | 76 » » muts, err := fn(c) |
52 if err != nil { | 77 if err != nil { |
53 return err | 78 return err |
54 } | 79 } |
55 | 80 |
56 retMuts = muts | 81 retMuts = muts |
57 » » shardSet, retMutKeys, err = putMutations(c, cfg, fromRoot, muts, round) | 82 » » shardSet, retMutKeys, err = putMutations(c, cfg, root, muts, rou nd) |
58 | 83 |
59 return err | 84 return err |
60 }, nil) | 85 }, nil) |
61 if err != nil { | 86 if err != nil { |
62 return nil, nil, nil, err | 87 return nil, nil, nil, err |
63 } | 88 } |
64 | 89 |
65 return shardSet, retMuts, retMutKeys, nil | 90 return shardSet, retMuts, retMutKeys, nil |
66 } | 91 } |
67 | 92 |
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
112 // CancelNamedMutations does a best-effort cancellation of the named mutations. | 137 // CancelNamedMutations does a best-effort cancellation of the named mutations. |
113 func CancelNamedMutations(c context.Context, parent *ds.Key, names ...string) er ror { | 138 func CancelNamedMutations(c context.Context, parent *ds.Key, names ...string) er ror { |
114 toDel := make([]*ds.Key, 0, len(names)) | 139 toDel := make([]*ds.Key, 0, len(names)) |
115 nameSet := stringset.NewFromSlice(names...) | 140 nameSet := stringset.NewFromSlice(names...) |
116 nameSet.Iter(func(name string) bool { | 141 nameSet.Iter(func(name string) bool { |
117 toDel = append(toDel, ds.NewKey(c, "tumble.Mutation", "n:"+name, 0, parent)) | 142 toDel = append(toDel, ds.NewKey(c, "tumble.Mutation", "n:"+name, 0, parent)) |
118 return true | 143 return true |
119 }) | 144 }) |
120 return errors.Filter(ds.Delete(c, toDel), ds.ErrNoSuchEntity) | 145 return errors.Filter(ds.Delete(c, toDel), ds.ErrNoSuchEntity) |
121 } | 146 } |
OLD | NEW |