OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package mutate | 5 package mutate |
6 | 6 |
7 import ( | 7 import ( |
| 8 "fmt" |
| 9 |
| 10 "github.com/luci/gae/filter/txnBuf" |
8 "github.com/luci/gae/service/datastore" | 11 "github.com/luci/gae/service/datastore" |
| 12 "github.com/luci/luci-go/appengine/cmd/dm/distributor" |
9 "github.com/luci/luci-go/appengine/cmd/dm/model" | 13 "github.com/luci/luci-go/appengine/cmd/dm/model" |
10 "github.com/luci/luci-go/appengine/tumble" | 14 "github.com/luci/luci-go/appengine/tumble" |
11 "github.com/luci/luci-go/common/api/dm/service/v1" | 15 "github.com/luci/luci-go/common/api/dm/service/v1" |
| 16 "github.com/luci/luci-go/common/errors" |
| 17 "github.com/luci/luci-go/common/logging" |
12 "golang.org/x/net/context" | 18 "golang.org/x/net/context" |
13 ) | 19 ) |
14 | 20 |
15 // ScheduleExecution is a placeholder mutation that will be an entry into the | 21 // ScheduleExecution is a placeholder mutation that will be an entry into the |
16 // Distributor scheduling state-machine. | 22 // Distributor scheduling state-machine. |
17 type ScheduleExecution struct { | 23 type ScheduleExecution struct { |
18 For *dm.Attempt_ID | 24 For *dm.Attempt_ID |
19 } | 25 } |
20 | 26 |
21 // Root implements tumble.Mutation | 27 // Root implements tumble.Mutation |
22 func (s *ScheduleExecution) Root(c context.Context) *datastore.Key { | 28 func (s *ScheduleExecution) Root(c context.Context) *datastore.Key { |
23 » return datastore.Get(c).KeyForObj(&model.Attempt{ID: *s.For}) | 29 » return model.AttemptKeyFromID(c, s.For) |
24 } | 30 } |
25 | 31 |
26 // RollForward implements tumble.Mutation | 32 // RollForward implements tumble.Mutation |
27 func (s *ScheduleExecution) RollForward(c context.Context) (muts []tumble.Mutati
on, err error) { | 33 func (s *ScheduleExecution) RollForward(c context.Context) (muts []tumble.Mutati
on, err error) { |
| 34 ds := datastore.Get(c) |
| 35 a := model.AttemptFromID(s.For) |
| 36 if err = ds.Get(a); err != nil { |
| 37 return |
| 38 } |
| 39 |
| 40 if a.State != dm.Attempt_SCHEDULING { |
| 41 return |
| 42 } |
| 43 |
| 44 q := model.QuestFromID(s.For.Quest) |
| 45 if err = txnBuf.GetNoTxn(c).Get(q); err != nil { |
| 46 return |
| 47 } |
| 48 |
| 49 reg := distributor.GetRegistry(c) |
| 50 dist, ver, err := reg.MakeDistributor(c, q.Desc.DistributorConfigName) |
| 51 if err != nil { |
| 52 return |
| 53 } |
| 54 |
| 55 a.CurExecution++ |
| 56 if err = a.ModifyState(c, dm.Attempt_EXECUTING); err != nil { |
| 57 return |
| 58 } |
| 59 |
| 60 eid := dm.NewExecutionID(s.For.Quest, s.For.Id, a.CurExecution) |
| 61 e := model.MakeExecution(c, eid, q.Desc.DistributorConfigName, ver) |
| 62 |
| 63 exAuth := &dm.Execution_Auth{Id: eid, Token: e.Token} |
| 64 |
| 65 var distTok distributor.Token |
| 66 distTok, e.TimeToStart, e.TimeToRun, e.TimeToStop, err = dist.Run( |
| 67 distributor.NewTaskDescription(c, &q.Desc, exAuth, |
| 68 distributor.PersistentState(a.PersistentState))) |
| 69 e.DistributorToken = string(distTok) |
| 70 if err != nil { |
| 71 if errors.IsTransient(err) { |
| 72 // tumble will retry us later |
| 73 logging.WithError(err).Errorf(c, "got transient error in
ScheduleExecution") |
| 74 return |
| 75 } |
| 76 origErr := err |
| 77 |
| 78 // put a and e to the transaction buffer, so that |
| 79 // FinishExecution.RollForward can see them. |
| 80 if err = ds.Put(a, e); err != nil { |
| 81 return |
| 82 } |
| 83 return NewFinishExecutionAbnormal( |
| 84 eid, dm.AbnormalFinish_REJECTED, |
| 85 fmt.Sprintf("rejected during scheduling with non-transie
nt error: %s", origErr), |
| 86 ).RollForward(c) |
| 87 } |
| 88 |
| 89 if err = ResetExecutionTimeout(c, e); err != nil { |
| 90 return |
| 91 } |
| 92 |
| 93 err = ds.Put(a, e) |
28 return | 94 return |
29 } | 95 } |
30 | 96 |
31 func init() { | 97 func init() { |
32 tumble.Register((*ScheduleExecution)(nil)) | 98 tumble.Register((*ScheduleExecution)(nil)) |
33 } | 99 } |
OLD | NEW |