OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package mutate | 5 package mutate |
6 | 6 |
7 import ( | 7 import ( |
8 "github.com/luci/gae/service/datastore" | 8 "github.com/luci/gae/service/datastore" |
9 "github.com/luci/luci-go/appengine/cmd/dm/model" | 9 "github.com/luci/luci-go/appengine/cmd/dm/model" |
10 "github.com/luci/luci-go/appengine/tumble" | 10 "github.com/luci/luci-go/appengine/tumble" |
11 "github.com/luci/luci-go/common/api/dm/service/v1" | 11 "github.com/luci/luci-go/common/api/dm/service/v1" |
12 "github.com/luci/luci-go/common/bit_field" | 12 "github.com/luci/luci-go/common/bit_field" |
13 "github.com/luci/luci-go/common/grpcutil" | 13 "github.com/luci/luci-go/common/grpcutil" |
| 14 "github.com/luci/luci-go/common/logging" |
14 "golang.org/x/net/context" | 15 "golang.org/x/net/context" |
15 "google.golang.org/grpc/codes" | 16 "google.golang.org/grpc/codes" |
16 ) | 17 ) |
17 | 18 |
18 // AddDeps transactionally stops the current execution and adds one or more | 19 // AddDeps transactionally stops the current execution and adds one or more |
19 // dependencies. It assumes that, prior to execution, all Quests named by Deps | 20 // dependencies. |
20 // have already been recorded globally. | |
21 type AddDeps struct { | 21 type AddDeps struct { |
22 Auth *dm.Execution_Auth | 22 Auth *dm.Execution_Auth |
23 Quests []*model.Quest | 23 Quests []*model.Quest |
24 | 24 |
25 » // Atmpts is attempts we think are missing from the global graph. | 25 » // Attempts is attempts we think are missing from the global graph. |
26 » Atmpts *dm.AttemptList | 26 » Attempts *dm.AttemptList |
27 | 27 |
28 // Deps are fwddeps we think are missing from the auth'd attempt. | 28 // Deps are fwddeps we think are missing from the auth'd attempt. |
29 Deps *dm.AttemptList | 29 Deps *dm.AttemptList |
30 } | 30 } |
31 | 31 |
32 // Root implements tumble.Mutation | 32 // Root implements tumble.Mutation |
33 func (a *AddDeps) Root(c context.Context) *datastore.Key { | 33 func (a *AddDeps) Root(c context.Context) *datastore.Key { |
34 » return datastore.Get(c).KeyForObj(&model.Attempt{ID: *a.Auth.Id.AttemptI
D()}) | 34 » return model.AttemptKeyFromID(c, a.Auth.Id.AttemptID()) |
35 } | 35 } |
36 | 36 |
37 // RollForward implements tumble.Mutation | 37 // RollForward implements tumble.Mutation |
38 // | 38 // |
39 // This mutation is called directly, so we use return grpc errors. | 39 // This mutation is called directly, so we return grpc errors. |
40 func (a *AddDeps) RollForward(c context.Context) (muts []tumble.Mutation, err er
ror) { | 40 func (a *AddDeps) RollForward(c context.Context) (muts []tumble.Mutation, err er
ror) { |
41 // Invalidate the execution key so that they can't make more API calls. | 41 // Invalidate the execution key so that they can't make more API calls. |
42 atmpt, _, err := model.InvalidateExecution(c, a.Auth) | 42 atmpt, _, err := model.InvalidateExecution(c, a.Auth) |
43 if err != nil { | 43 if err != nil { |
44 return | 44 return |
45 } | 45 } |
46 | 46 |
47 fwdDeps, err := filterExisting(c, model.FwdDepsFromList(c, a.Auth.Id.Att
emptID(), a.Deps)) | 47 fwdDeps, err := filterExisting(c, model.FwdDepsFromList(c, a.Auth.Id.Att
emptID(), a.Deps)) |
48 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "while filtering deps
") | 48 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "while filtering deps
") |
49 if err != nil || len(fwdDeps) == 0 { | 49 if err != nil || len(fwdDeps) == 0 { |
50 return | 50 return |
51 } | 51 } |
52 | 52 |
53 ds := datastore.Get(c) | 53 ds := datastore.Get(c) |
54 | 54 |
55 » atmpt.AddingDepsBitmap = bf.Make(uint32(len(fwdDeps))) | 55 » logging.Fields{"aid": atmpt.ID, "count": len(fwdDeps)}.Infof(c, "added d
eps") |
56 » atmpt.WaitingDepBitmap = bf.Make(uint32(len(fwdDeps))) | 56 » atmpt.DepMap = bf.Make(uint32(len(fwdDeps))) |
57 » atmpt.MustModifyState(c, dm.Attempt_ADDING_DEPS) | |
58 | 57 |
59 for i, fdp := range fwdDeps { | 58 for i, fdp := range fwdDeps { |
60 fdp.BitIndex = uint32(i) | 59 fdp.BitIndex = uint32(i) |
61 fdp.ForExecution = atmpt.CurExecution | 60 fdp.ForExecution = atmpt.CurExecution |
62 } | 61 } |
63 | 62 |
64 if err = ds.Put(fwdDeps); err != nil { | 63 if err = ds.Put(fwdDeps); err != nil { |
65 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "error puttin
g new fwdDeps") | 64 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "error puttin
g new fwdDeps") |
66 return | 65 return |
67 } | 66 } |
68 if err = ds.Put(atmpt); err != nil { | 67 if err = ds.Put(atmpt); err != nil { |
69 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "error puttin
g attempt") | 68 err = grpcutil.MaybeLogErr(c, err, codes.Internal, "error puttin
g attempt") |
70 return | 69 return |
71 } | 70 } |
72 | 71 |
73 » muts = make([]tumble.Mutation, 0, len(fwdDeps)+len(a.Atmpts.GetTo())) | 72 » mergeQuestMap := map[string]*MergeQuest(nil) |
74 » for _, d := range fwdDeps { | 73 » if len(a.Quests) > 0 { |
75 » » if nums, ok := a.Atmpts.GetTo()[d.Dependee.Quest]; ok { | 74 » » mergeQuestMap = make(map[string]*MergeQuest, len(a.Quests)) |
| 75 » » for _, q := range a.Quests { |
| 76 » » » mergeQuestMap[q.ID] = &MergeQuest{Quest: q} |
| 77 » » } |
| 78 » } |
| 79 |
| 80 » muts = make([]tumble.Mutation, 0, len(fwdDeps)+len(a.Attempts.GetTo())+l
en(a.Quests)) |
| 81 » for _, dep := range fwdDeps { |
| 82 » » toAppend := &muts |
| 83 » » if mq := mergeQuestMap[dep.Dependee.Quest]; mq != nil { |
| 84 » » » toAppend = &mq.AndThen |
| 85 » » } |
| 86 |
| 87 » » if nums, ok := a.Attempts.GetTo()[dep.Dependee.Quest]; ok { |
76 for _, n := range nums.Nums { | 88 for _, n := range nums.Nums { |
77 » » » » if n == d.Dependee.Id { | 89 » » » » if n == dep.Dependee.Id { |
78 » » » » » muts = append(muts, &EnsureAttempt{ID: &
d.Dependee}) | 90 » » » » » *toAppend = append(*toAppend, &EnsureAtt
empt{ID: &dep.Dependee}) |
79 break | 91 break |
80 } | 92 } |
81 } | 93 } |
82 } | 94 } |
83 » » muts = append(muts, &AddBackDep{ | 95 » » *toAppend = append(*toAppend, &AddBackDep{ |
84 » » » Dep: d.Edge(), | 96 » » » Dep: dep.Edge(), |
85 NeedsAck: true, | 97 NeedsAck: true, |
86 }) | 98 }) |
87 } | 99 } |
88 | 100 |
| 101 // TODO(iannucci): This could run into datastore transaction limits. We
could |
| 102 // allieviate this by only emitting a single mutation which does tail-ca
lls to |
| 103 // decrease its own, unprocessed size by emitting new MergeQuest mutatio
ns. |
| 104 for _, mut := range mergeQuestMap { |
| 105 muts = append(muts, mut) |
| 106 } |
| 107 |
89 return | 108 return |
90 } | 109 } |
91 | 110 |
92 func init() { | 111 func init() { |
93 tumble.Register((*AddDeps)(nil)) | 112 tumble.Register((*AddDeps)(nil)) |
94 } | 113 } |
OLD | NEW |