OLD | NEW |
(Empty) | |
| 1 // Copyright 2016 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. |
| 4 |
| 5 package jobsim |
| 6 |
| 7 import ( |
| 8 "encoding/json" |
| 9 "fmt" |
| 10 "io/ioutil" |
| 11 "net/http" |
| 12 |
| 13 "github.com/luci/gae/service/datastore" |
| 14 "github.com/luci/gae/service/info" |
| 15 "github.com/luci/gae/service/taskqueue" |
| 16 |
| 17 "github.com/luci/luci-go/appengine/cmd/dm/distributor" |
| 18 "github.com/luci/luci-go/appengine/cmd/dm/distributor/protos/jobsim" |
| 19 "github.com/luci/luci-go/appengine/cmd/dm/enums/execution" |
| 20 |
| 21 "golang.org/x/net/context" |
| 22 "google.golang.org/api/pubsub/v1" |
| 23 ) |
| 24 |
| 25 type jobsimDist struct { |
| 26 c context.Context |
| 27 cfg *distributor.Config |
| 28 } |
| 29 |
| 30 func (j *jobsimDist) getNativeConfig() *jobsim.Config { |
| 31 return j.cfg.ImplConfig.(*jobsim.Config) |
| 32 } |
| 33 |
| 34 func (j *jobsimDist) Run(tsk distributor.TaskDescription) (tok distributor.Token
, err error) { |
| 35 ds := datastore.Get(j.c) |
| 36 |
| 37 sa, err := info.Get(j.c).ServiceAccount() |
| 38 if err != nil { |
| 39 return |
| 40 } |
| 41 |
| 42 pubsub, token, err := tsk.PrepareTopic(sa) |
| 43 if err != nil { |
| 44 return |
| 45 } |
| 46 |
| 47 jtsk := &jobsimTask{ |
| 48 AttemptID: *tsk.AttemptID(), |
| 49 ExecutionID: tsk.ExecutionID(), |
| 50 Calculation: tsk.Payload(), |
| 51 ExecutionState: execution.Scheduled, |
| 52 ExecutionKey: tsk.ExecutionKey(), |
| 53 NotifyPath: pubsub, |
| 54 NotifyToken: token, |
| 55 } |
| 56 |
| 57 err = jtsk.PersistentState.FromPersistentState(tsk.PreviousState()) |
| 58 if err != nil { |
| 59 return |
| 60 } |
| 61 |
| 62 id, err := ds.AllocateIDs(ds.KeyForObj(jtsk), 1) |
| 63 if err != nil { |
| 64 return "", err |
| 65 } |
| 66 |
| 67 // transactionally commit the job and a taskqueue task to execute it |
| 68 jtsk.ID = fmt.Sprintf("%s|%d", j.getNativeConfig().Pool, id) |
| 69 err = ds.RunInTransaction(func(c context.Context) error { |
| 70 ds := datastore.Get(c) |
| 71 |
| 72 err := ds.Put(jtsk) |
| 73 if err != nil { |
| 74 return err |
| 75 } |
| 76 |
| 77 tq := taskqueue.Get(c) |
| 78 tsk := tq.NewTask(j.cfg.TQHandlerURL) |
| 79 tsk.Payload = []byte(jtsk.ID) |
| 80 return tq.Add(tsk, "") |
| 81 }, nil) |
| 82 |
| 83 tok = distributor.Token(jtsk.ID) |
| 84 return |
| 85 } |
| 86 |
| 87 func (j *jobsimDist) Cancel(tok distributor.Token) error { |
| 88 jtsk := &jobsimTask{ID: string(tok)} |
| 89 |
| 90 cancelBody := func(ds datastore.Interface) (needWrite bool, err error) { |
| 91 if err = ds.Get(jtsk); err != nil { |
| 92 return |
| 93 } |
| 94 if jtsk.ExecutionState.IsTerminal() { |
| 95 return |
| 96 } |
| 97 needWrite = true |
| 98 return |
| 99 } |
| 100 |
| 101 ds := datastore.Get(j.c) |
| 102 if needWrite, err := cancelBody(ds); err != nil || !needWrite { |
| 103 return err |
| 104 } |
| 105 |
| 106 return ds.RunInTransaction(func(c context.Context) error { |
| 107 ds := datastore.Get(c) |
| 108 if needWrite, err := cancelBody(ds); err != nil || !needWrite { |
| 109 return err |
| 110 } |
| 111 if err := jtsk.ExecutionState.Evolve(execution.Cancelled); err !
= nil { |
| 112 return err |
| 113 } |
| 114 return ds.Put(jtsk) |
| 115 }, nil) |
| 116 } |
| 117 |
| 118 func (j *jobsimDist) GetStatus(tok distributor.Token) (*distributor.TaskState, e
rror) { |
| 119 ds := datastore.Get(j.c) |
| 120 jtsk := &jobsimTask{ID: string(tok)} |
| 121 if err := ds.Get(jtsk); err != nil { |
| 122 return nil, err |
| 123 } |
| 124 return &distributor.TaskState{ |
| 125 ExecutionState: jtsk.ExecutionState, |
| 126 PersistentState: jtsk.PersistentState.ToPersistentState(), |
| 127 }, nil |
| 128 } |
| 129 |
| 130 func (j *jobsimDist) InfoURL(tok distributor.Token) string { |
| 131 return fmt.Sprintf("jobsim://%s/ver/%s/tok/%s", j.cfg.CfgName, j.cfg.Ver
sion, tok) |
| 132 } |
| 133 |
| 134 func (j *jobsimDist) HandleNotification(msg *pubsub.PubsubMessage) (*distributor
.TaskState, error) { |
| 135 if j.getNativeConfig().DoPollback { |
| 136 return nil, nil |
| 137 } |
| 138 |
| 139 props := map[string]string(nil) |
| 140 err := json.Unmarshal([]byte(msg.Data), &props) |
| 141 if err != nil { |
| 142 return nil, err |
| 143 } |
| 144 |
| 145 return &distributor.TaskState{ |
| 146 ExecutionState: execution.FromString(props["ExecutionState"]), |
| 147 PersistentState: distributor.PersistentState(props["PersistentSt
ate"]), |
| 148 }, nil |
| 149 } |
| 150 |
| 151 func (j *jobsimDist) HandleTaskQueueTask(c context.Context, r *http.Request) err
or { |
| 152 // body is a distributor.Token |
| 153 rawTok, err := ioutil.ReadAll(r.Body) |
| 154 if err != nil { |
| 155 return err |
| 156 } |
| 157 |
| 158 ds := datastore.Get(c) |
| 159 jtsk := &jobsimTask{ID: string(rawTok)} |
| 160 if err := ds.Get(jtsk); err != nil { |
| 161 return err |
| 162 } |
| 163 |
| 164 stage := jtsk.PersistentState.Stage |
| 165 |
| 166 return nil |
| 167 } |
| 168 |
| 169 func init() { |
| 170 distributor.Register((*jobsim.Config)(nil), func(c context.Context, cfg
*distributor.Config) (distributor.D, error) { |
| 171 return &jobsimDist{c, cfg}, nil |
| 172 }) |
| 173 } |
OLD | NEW |