Index: appengine/cmd/dm/distributor/impl/jobsim/distributor.go |
diff --git a/appengine/cmd/dm/distributor/impl/jobsim/distributor.go b/appengine/cmd/dm/distributor/impl/jobsim/distributor.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..858501208b11bb1d73cbeb1241cc678b9a0f4750 |
--- /dev/null |
+++ b/appengine/cmd/dm/distributor/impl/jobsim/distributor.go |
@@ -0,0 +1,173 @@ |
+// Copyright 2016 The Chromium Authors. All rights reserved. |
+// Use of this source code is governed by a BSD-style license that can be |
+// found in the LICENSE file. |
+ |
+package jobsim |
+ |
+import ( |
+ "encoding/json" |
+ "fmt" |
+ "io/ioutil" |
+ "net/http" |
+ |
+ "github.com/luci/gae/service/datastore" |
+ "github.com/luci/gae/service/info" |
+ "github.com/luci/gae/service/taskqueue" |
+ |
+ "github.com/luci/luci-go/appengine/cmd/dm/distributor" |
+ "github.com/luci/luci-go/appengine/cmd/dm/distributor/protos/jobsim" |
+ "github.com/luci/luci-go/appengine/cmd/dm/enums/execution" |
+ |
+ "golang.org/x/net/context" |
+ "google.golang.org/api/pubsub/v1" |
+) |
+ |
+type jobsimDist struct { |
+ c context.Context |
+ cfg *distributor.Config |
+} |
+ |
+func (j *jobsimDist) getNativeConfig() *jobsim.Config { |
+ return j.cfg.ImplConfig.(*jobsim.Config) |
+} |
+ |
+func (j *jobsimDist) Run(tsk distributor.TaskDescription) (tok distributor.Token, err error) { |
+ ds := datastore.Get(j.c) |
+ |
+ sa, err := info.Get(j.c).ServiceAccount() |
+ if err != nil { |
+ return |
+ } |
+ |
+ pubsub, token, err := tsk.PrepareTopic(sa) |
+ if err != nil { |
+ return |
+ } |
+ |
+ jtsk := &jobsimTask{ |
+ AttemptID: *tsk.AttemptID(), |
+ ExecutionID: tsk.ExecutionID(), |
+ Calculation: tsk.Payload(), |
+ ExecutionState: execution.Scheduled, |
+ ExecutionKey: tsk.ExecutionKey(), |
+ NotifyPath: pubsub, |
+ NotifyToken: token, |
+ } |
+ |
+ err = jtsk.PersistentState.FromPersistentState(tsk.PreviousState()) |
+ if err != nil { |
+ return |
+ } |
+ |
+ id, err := ds.AllocateIDs(ds.KeyForObj(jtsk), 1) |
+ if err != nil { |
+ return "", err |
+ } |
+ |
+ // transactionally commit the job and a taskqueue task to execute it |
+ jtsk.ID = fmt.Sprintf("%s|%d", j.getNativeConfig().Pool, id) |
+ err = ds.RunInTransaction(func(c context.Context) error { |
+ ds := datastore.Get(c) |
+ |
+ err := ds.Put(jtsk) |
+ if err != nil { |
+ return err |
+ } |
+ |
+ tq := taskqueue.Get(c) |
+ tsk := tq.NewTask(j.cfg.TQHandlerURL) |
+ tsk.Payload = []byte(jtsk.ID) |
+ return tq.Add(tsk, "") |
+ }, nil) |
+ |
+ tok = distributor.Token(jtsk.ID) |
+ return |
+} |
+ |
+func (j *jobsimDist) Cancel(tok distributor.Token) error { |
+ jtsk := &jobsimTask{ID: string(tok)} |
+ |
+ cancelBody := func(ds datastore.Interface) (needWrite bool, err error) { |
+ if err = ds.Get(jtsk); err != nil { |
+ return |
+ } |
+ if jtsk.ExecutionState.IsTerminal() { |
+ return |
+ } |
+ needWrite = true |
+ return |
+ } |
+ |
+ ds := datastore.Get(j.c) |
+ if needWrite, err := cancelBody(ds); err != nil || !needWrite { |
+ return err |
+ } |
+ |
+ return ds.RunInTransaction(func(c context.Context) error { |
+ ds := datastore.Get(c) |
+ if needWrite, err := cancelBody(ds); err != nil || !needWrite { |
+ return err |
+ } |
+ if err := jtsk.ExecutionState.Evolve(execution.Cancelled); err != nil { |
+ return err |
+ } |
+ return ds.Put(jtsk) |
+ }, nil) |
+} |
+ |
+func (j *jobsimDist) GetStatus(tok distributor.Token) (*distributor.TaskState, error) { |
+ ds := datastore.Get(j.c) |
+ jtsk := &jobsimTask{ID: string(tok)} |
+ if err := ds.Get(jtsk); err != nil { |
+ return nil, err |
+ } |
+ return &distributor.TaskState{ |
+ ExecutionState: jtsk.ExecutionState, |
+ PersistentState: jtsk.PersistentState.ToPersistentState(), |
+ }, nil |
+} |
+ |
+func (j *jobsimDist) InfoURL(tok distributor.Token) string { |
+ return fmt.Sprintf("jobsim://%s/ver/%s/tok/%s", j.cfg.CfgName, j.cfg.Version, tok) |
+} |
+ |
+func (j *jobsimDist) HandleNotification(msg *pubsub.PubsubMessage) (*distributor.TaskState, error) { |
+ if j.getNativeConfig().DoPollback { |
+ return nil, nil |
+ } |
+ |
+ props := map[string]string(nil) |
+ err := json.Unmarshal([]byte(msg.Data), &props) |
+ if err != nil { |
+ return nil, err |
+ } |
+ |
+ return &distributor.TaskState{ |
+ ExecutionState: execution.FromString(props["ExecutionState"]), |
+ PersistentState: distributor.PersistentState(props["PersistentState"]), |
+ }, nil |
+} |
+ |
+func (j *jobsimDist) HandleTaskQueueTask(c context.Context, r *http.Request) error { |
+ // body is a distributor.Token |
+ rawTok, err := ioutil.ReadAll(r.Body) |
+ if err != nil { |
+ return err |
+ } |
+ |
+ ds := datastore.Get(c) |
+ jtsk := &jobsimTask{ID: string(rawTok)} |
+ if err := ds.Get(jtsk); err != nil { |
+ return err |
+ } |
+ |
+ stage := jtsk.PersistentState.Stage |
+ |
+ return nil |
+} |
+ |
+func init() { |
+ distributor.Register((*jobsim.Config)(nil), func(c context.Context, cfg *distributor.Config) (distributor.D, error) { |
+ return &jobsimDist{c, cfg}, nil |
+ }) |
+} |