Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(75)

Side by Side Diff: appengine/cmd/dm/distributor/impl/jobsim/distributor.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: work in progress Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2016 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package jobsim
6
7 import (
8 "encoding/json"
9 "fmt"
10 "io/ioutil"
11 "net/http"
12
13 "github.com/luci/gae/service/datastore"
14 "github.com/luci/gae/service/info"
15 "github.com/luci/gae/service/taskqueue"
16
17 "github.com/luci/luci-go/appengine/cmd/dm/distributor"
18 "github.com/luci/luci-go/appengine/cmd/dm/distributor/protos/jobsim"
19 "github.com/luci/luci-go/appengine/cmd/dm/enums/execution"
20
21 "golang.org/x/net/context"
22 "google.golang.org/api/pubsub/v1"
23 )
24
25 type jobsimDist struct {
26 c context.Context
27 cfg *distributor.Config
28 }
29
30 func (j *jobsimDist) getNativeConfig() *jobsim.Config {
31 return j.cfg.ImplConfig.(*jobsim.Config)
32 }
33
34 func (j *jobsimDist) Run(tsk distributor.TaskDescription) (tok distributor.Token , err error) {
35 ds := datastore.Get(j.c)
36
37 sa, err := info.Get(j.c).ServiceAccount()
38 if err != nil {
39 return
40 }
41
42 pubsub, token, err := tsk.PrepareTopic(sa)
43 if err != nil {
44 return
45 }
46
47 jtsk := &jobsimTask{
48 AttemptID: *tsk.AttemptID(),
49 ExecutionID: tsk.ExecutionID(),
50 Calculation: tsk.Payload(),
51 ExecutionState: execution.Scheduled,
52 ExecutionKey: tsk.ExecutionKey(),
53 NotifyPath: pubsub,
54 NotifyToken: token,
55 }
56
57 err = jtsk.PersistentState.FromPersistentState(tsk.PreviousState())
58 if err != nil {
59 return
60 }
61
62 id, err := ds.AllocateIDs(ds.KeyForObj(jtsk), 1)
63 if err != nil {
64 return "", err
65 }
66
67 // transactionally commit the job and a taskqueue task to execute it
68 jtsk.ID = fmt.Sprintf("%s|%d", j.getNativeConfig().Pool, id)
69 err = ds.RunInTransaction(func(c context.Context) error {
70 ds := datastore.Get(c)
71
72 err := ds.Put(jtsk)
73 if err != nil {
74 return err
75 }
76
77 tq := taskqueue.Get(c)
78 tsk := tq.NewTask(j.cfg.TQHandlerURL)
79 tsk.Payload = []byte(jtsk.ID)
80 return tq.Add(tsk, "")
81 }, nil)
82
83 tok = distributor.Token(jtsk.ID)
84 return
85 }
86
87 func (j *jobsimDist) Cancel(tok distributor.Token) error {
88 jtsk := &jobsimTask{ID: string(tok)}
89
90 cancelBody := func(ds datastore.Interface) (needWrite bool, err error) {
91 if err = ds.Get(jtsk); err != nil {
92 return
93 }
94 if jtsk.ExecutionState.IsTerminal() {
95 return
96 }
97 needWrite = true
98 return
99 }
100
101 ds := datastore.Get(j.c)
102 if needWrite, err := cancelBody(ds); err != nil || !needWrite {
103 return err
104 }
105
106 return ds.RunInTransaction(func(c context.Context) error {
107 ds := datastore.Get(c)
108 if needWrite, err := cancelBody(ds); err != nil || !needWrite {
109 return err
110 }
111 if err := jtsk.ExecutionState.Evolve(execution.Cancelled); err ! = nil {
112 return err
113 }
114 return ds.Put(jtsk)
115 }, nil)
116 }
117
118 func (j *jobsimDist) GetStatus(tok distributor.Token) (*distributor.TaskState, e rror) {
119 ds := datastore.Get(j.c)
120 jtsk := &jobsimTask{ID: string(tok)}
121 if err := ds.Get(jtsk); err != nil {
122 return nil, err
123 }
124 return &distributor.TaskState{
125 ExecutionState: jtsk.ExecutionState,
126 PersistentState: jtsk.PersistentState.ToPersistentState(),
127 }, nil
128 }
129
130 func (j *jobsimDist) InfoURL(tok distributor.Token) string {
131 return fmt.Sprintf("jobsim://%s/ver/%s/tok/%s", j.cfg.CfgName, j.cfg.Ver sion, tok)
132 }
133
134 func (j *jobsimDist) HandleNotification(msg *pubsub.PubsubMessage) (*distributor .TaskState, error) {
135 if j.getNativeConfig().DoPollback {
136 return nil, nil
137 }
138
139 props := map[string]string(nil)
140 err := json.Unmarshal([]byte(msg.Data), &props)
141 if err != nil {
142 return nil, err
143 }
144
145 return &distributor.TaskState{
146 ExecutionState: execution.FromString(props["ExecutionState"]),
147 PersistentState: distributor.PersistentState(props["PersistentSt ate"]),
148 }, nil
149 }
150
151 func (j *jobsimDist) HandleTaskQueueTask(c context.Context, r *http.Request) err or {
152 // body is a distributor.Token
153 rawTok, err := ioutil.ReadAll(r.Body)
154 if err != nil {
155 return err
156 }
157
158 ds := datastore.Get(c)
159 jtsk := &jobsimTask{ID: string(rawTok)}
160 if err := ds.Get(jtsk); err != nil {
161 return err
162 }
163
164 stage := jtsk.PersistentState.Stage
165
166 return nil
167 }
168
169 func init() {
170 distributor.Register((*jobsim.Config)(nil), func(c context.Context, cfg *distributor.Config) (distributor.D, error) {
171 return &jobsimDist{c, cfg}, nil
172 })
173 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698