Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package tumble | 5 package tumble |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 | 9 |
| 10 "github.com/luci/gae/filter/txnBuf" | 10 "github.com/luci/gae/filter/txnBuf" |
| 11 ds "github.com/luci/gae/service/datastore" | 11 ds "github.com/luci/gae/service/datastore" |
| 12 "github.com/luci/luci-go/common/clock" | 12 "github.com/luci/luci-go/common/clock" |
| 13 "github.com/luci/luci-go/common/data/stringset" | 13 "github.com/luci/luci-go/common/data/stringset" |
| 14 "github.com/luci/luci-go/common/errors" | 14 "github.com/luci/luci-go/common/errors" |
| 15 "github.com/luci/luci-go/common/logging" | 15 "github.com/luci/luci-go/common/logging" |
| 16 "golang.org/x/net/context" | 16 "golang.org/x/net/context" |
| 17 ) | 17 ) |
| 18 | 18 |
| 19 // RunMutation immediately runs the Mutation `m` in a transaction. This method | 19 // RunMutation immediately runs the Mutation `m` in a transaction. This method |
| 20 // should be used to start a tumble chain when you have transactional checks | 20 // should be used to start a tumble chain when you have transactional checks |
| 21 // to do (e.g. `m` implements the first transactional link in the chain). | 21 // to do (e.g. `m` implements the first transactional link in the chain). |
| 22 // | 22 // |
| 23 // Usually this is called from your application's handlers to begin a tumble | 23 // Usually this is called from your application's handlers to begin a tumble |
| 24 // state machine as a result of some API interaction. | 24 // state machine as a result of some API interaction. |
| 25 func RunMutation(c context.Context, m Mutation) error { | 25 func RunMutation(c context.Context, m Mutation) error { |
| 26 cfg := getConfig(c) | 26 cfg := getConfig(c) |
| 27 » shardSet, _, _, err := enterTransactionInternal(txnBuf.FilterRDS(c), cfg , m, 0) | 27 » shardSet, _, _, err := enterTransactionMutation(txnBuf.FilterRDS(c), cfg , m, 0) |
| 28 if err != nil { | 28 if err != nil { |
| 29 return err | 29 return err |
| 30 } | 30 } |
| 31 fireTasks(c, cfg, shardSet) | 31 fireTasks(c, cfg, shardSet) |
| 32 return nil | 32 return nil |
| 33 } | 33 } |
| 34 | 34 |
| 35 func enterTransactionInternal(c context.Context, cfg *Config, m Mutation, round uint64) (map[taskShard]struct{}, []Mutation, []*ds.Key, error) { | 35 // RegisterInTransaction opens a lightweight unbuffered transaction on "root" |
|
iannucci
2016/12/21 10:05:19
Why not 'RunUnbuffered'? There's no registration h
| |
| 36 » fromRoot := m.Root(c) | 36 // runs "fn" inside of it. Any mutations returned by "fn" will be registered |
| 37 // at the end of the transaction if "fn" doesn't return an error. | |
| 38 // | |
| 39 // This is useful as an initial starting point without incurring all of the | |
| 40 // overhead of spinning up a new buffered transaction. | |
|
iannucci
2016/12/21 10:05:19
s/all of the//
| |
| 41 // | |
| 42 // During "fn"'s execution, standard Tumble operations such as PutNamedMutation | |
| 43 // and CancelNamedMutation may be performed. | |
| 44 func RegisterInTransaction(c context.Context, root *ds.Key, fn func(context.Cont ext) ([]Mutation, error)) error { | |
| 45 » cfg := getConfig(c) | |
| 46 » shardSet, _, _, err := enterTransactionInternal(c, cfg, root, fn, 0) | |
| 47 » if err != nil { | |
| 48 » » return err | |
| 49 » } | |
| 50 » fireTasks(c, cfg, shardSet) | |
| 51 » return nil | |
| 52 } | |
| 37 | 53 |
| 38 » if fromRoot == nil { | 54 func enterTransactionMutation(c context.Context, cfg *Config, m Mutation, round uint64) ( |
| 39 » » return nil, nil, nil, fmt.Errorf("tumble: Passing nil as fromRoo t is illegal") | 55 » map[taskShard]struct{}, []Mutation, []*ds.Key, error) { |
| 56 | |
| 57 » return enterTransactionInternal(c, cfg, m.Root(c), m.RollForward, round) | |
| 58 } | |
| 59 | |
| 60 func enterTransactionInternal(c context.Context, cfg *Config, root *ds.Key, fn f unc(context.Context) ([]Mutation, error), round uint64) ( | |
| 61 » map[taskShard]struct{}, []Mutation, []*ds.Key, error) { | |
| 62 | |
| 63 » if root == nil { | |
| 64 » » return nil, nil, nil, fmt.Errorf("tumble: Passing nil as root is illegal") | |
| 40 } | 65 } |
| 41 | 66 |
| 42 shardSet := map[taskShard]struct{}(nil) | 67 shardSet := map[taskShard]struct{}(nil) |
| 43 retMuts := []Mutation(nil) | 68 retMuts := []Mutation(nil) |
| 44 retMutKeys := []*ds.Key(nil) | 69 retMutKeys := []*ds.Key(nil) |
| 45 | 70 |
| 46 err := ds.RunInTransaction(c, func(c context.Context) error { | 71 err := ds.RunInTransaction(c, func(c context.Context) error { |
| 47 » » // do a Get on the fromRoot to ensure that this transaction is a ssociated | 72 » » // do a Get on the root to ensure that this transaction is assoc iated |
| 48 // with that entity group. | 73 // with that entity group. |
| 49 » » _, _ = ds.Exists(c, fromRoot) | 74 » » _, _ = ds.Exists(c, root) |
| 50 | 75 |
| 51 » » muts, err := m.RollForward(c) | 76 » » muts, err := fn(c) |
| 52 if err != nil { | 77 if err != nil { |
| 53 return err | 78 return err |
| 54 } | 79 } |
| 55 | 80 |
| 56 retMuts = muts | 81 retMuts = muts |
| 57 » » shardSet, retMutKeys, err = putMutations(c, cfg, fromRoot, muts, round) | 82 » » shardSet, retMutKeys, err = putMutations(c, cfg, root, muts, rou nd) |
| 58 | 83 |
| 59 return err | 84 return err |
| 60 }, nil) | 85 }, nil) |
| 61 if err != nil { | 86 if err != nil { |
| 62 return nil, nil, nil, err | 87 return nil, nil, nil, err |
| 63 } | 88 } |
| 64 | 89 |
| 65 return shardSet, retMuts, retMutKeys, nil | 90 return shardSet, retMuts, retMutKeys, nil |
| 66 } | 91 } |
| 67 | 92 |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 112 // CancelNamedMutations does a best-effort cancellation of the named mutations. | 137 // CancelNamedMutations does a best-effort cancellation of the named mutations. |
| 113 func CancelNamedMutations(c context.Context, parent *ds.Key, names ...string) er ror { | 138 func CancelNamedMutations(c context.Context, parent *ds.Key, names ...string) er ror { |
| 114 toDel := make([]*ds.Key, 0, len(names)) | 139 toDel := make([]*ds.Key, 0, len(names)) |
| 115 nameSet := stringset.NewFromSlice(names...) | 140 nameSet := stringset.NewFromSlice(names...) |
| 116 nameSet.Iter(func(name string) bool { | 141 nameSet.Iter(func(name string) bool { |
| 117 toDel = append(toDel, ds.NewKey(c, "tumble.Mutation", "n:"+name, 0, parent)) | 142 toDel = append(toDel, ds.NewKey(c, "tumble.Mutation", "n:"+name, 0, parent)) |
| 118 return true | 143 return true |
| 119 }) | 144 }) |
| 120 return errors.Filter(ds.Delete(c, toDel), ds.ErrNoSuchEntity) | 145 return errors.Filter(ds.Delete(c, toDel), ds.ErrNoSuchEntity) |
| 121 } | 146 } |
| OLD | NEW |