Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2476)

Unified Diff: appengine/cmd/dm/distributor/impl/jobsim/distributor.go

Issue 1537883002: Initial distributor implementation (Closed) Base URL: https://chromium.googlesource.com/external/github.com/luci/luci-go@master
Patch Set: work in progress Created 4 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/cmd/dm/distributor/impl/jobsim/distributor.go
diff --git a/appengine/cmd/dm/distributor/impl/jobsim/distributor.go b/appengine/cmd/dm/distributor/impl/jobsim/distributor.go
new file mode 100644
index 0000000000000000000000000000000000000000..858501208b11bb1d73cbeb1241cc678b9a0f4750
--- /dev/null
+++ b/appengine/cmd/dm/distributor/impl/jobsim/distributor.go
@@ -0,0 +1,173 @@
+// Copyright 2016 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+package jobsim
+
+import (
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+
+ "github.com/luci/gae/service/datastore"
+ "github.com/luci/gae/service/info"
+ "github.com/luci/gae/service/taskqueue"
+
+ "github.com/luci/luci-go/appengine/cmd/dm/distributor"
+ "github.com/luci/luci-go/appengine/cmd/dm/distributor/protos/jobsim"
+ "github.com/luci/luci-go/appengine/cmd/dm/enums/execution"
+
+ "golang.org/x/net/context"
+ "google.golang.org/api/pubsub/v1"
+)
+
+type jobsimDist struct {
+ c context.Context
+ cfg *distributor.Config
+}
+
+func (j *jobsimDist) getNativeConfig() *jobsim.Config {
+ return j.cfg.ImplConfig.(*jobsim.Config)
+}
+
+func (j *jobsimDist) Run(tsk distributor.TaskDescription) (tok distributor.Token, err error) {
+ ds := datastore.Get(j.c)
+
+ sa, err := info.Get(j.c).ServiceAccount()
+ if err != nil {
+ return
+ }
+
+ pubsub, token, err := tsk.PrepareTopic(sa)
+ if err != nil {
+ return
+ }
+
+ jtsk := &jobsimTask{
+ AttemptID: *tsk.AttemptID(),
+ ExecutionID: tsk.ExecutionID(),
+ Calculation: tsk.Payload(),
+ ExecutionState: execution.Scheduled,
+ ExecutionKey: tsk.ExecutionKey(),
+ NotifyPath: pubsub,
+ NotifyToken: token,
+ }
+
+ err = jtsk.PersistentState.FromPersistentState(tsk.PreviousState())
+ if err != nil {
+ return
+ }
+
+ id, err := ds.AllocateIDs(ds.KeyForObj(jtsk), 1)
+ if err != nil {
+ return "", err
+ }
+
+ // transactionally commit the job and a taskqueue task to execute it
+ jtsk.ID = fmt.Sprintf("%s|%d", j.getNativeConfig().Pool, id)
+ err = ds.RunInTransaction(func(c context.Context) error {
+ ds := datastore.Get(c)
+
+ err := ds.Put(jtsk)
+ if err != nil {
+ return err
+ }
+
+ tq := taskqueue.Get(c)
+ tsk := tq.NewTask(j.cfg.TQHandlerURL)
+ tsk.Payload = []byte(jtsk.ID)
+ return tq.Add(tsk, "")
+ }, nil)
+
+ tok = distributor.Token(jtsk.ID)
+ return
+}
+
+func (j *jobsimDist) Cancel(tok distributor.Token) error {
+ jtsk := &jobsimTask{ID: string(tok)}
+
+ cancelBody := func(ds datastore.Interface) (needWrite bool, err error) {
+ if err = ds.Get(jtsk); err != nil {
+ return
+ }
+ if jtsk.ExecutionState.IsTerminal() {
+ return
+ }
+ needWrite = true
+ return
+ }
+
+ ds := datastore.Get(j.c)
+ if needWrite, err := cancelBody(ds); err != nil || !needWrite {
+ return err
+ }
+
+ return ds.RunInTransaction(func(c context.Context) error {
+ ds := datastore.Get(c)
+ if needWrite, err := cancelBody(ds); err != nil || !needWrite {
+ return err
+ }
+ if err := jtsk.ExecutionState.Evolve(execution.Cancelled); err != nil {
+ return err
+ }
+ return ds.Put(jtsk)
+ }, nil)
+}
+
+func (j *jobsimDist) GetStatus(tok distributor.Token) (*distributor.TaskState, error) {
+ ds := datastore.Get(j.c)
+ jtsk := &jobsimTask{ID: string(tok)}
+ if err := ds.Get(jtsk); err != nil {
+ return nil, err
+ }
+ return &distributor.TaskState{
+ ExecutionState: jtsk.ExecutionState,
+ PersistentState: jtsk.PersistentState.ToPersistentState(),
+ }, nil
+}
+
+func (j *jobsimDist) InfoURL(tok distributor.Token) string {
+ return fmt.Sprintf("jobsim://%s/ver/%s/tok/%s", j.cfg.CfgName, j.cfg.Version, tok)
+}
+
+func (j *jobsimDist) HandleNotification(msg *pubsub.PubsubMessage) (*distributor.TaskState, error) {
+ if j.getNativeConfig().DoPollback {
+ return nil, nil
+ }
+
+ props := map[string]string(nil)
+ err := json.Unmarshal([]byte(msg.Data), &props)
+ if err != nil {
+ return nil, err
+ }
+
+ return &distributor.TaskState{
+ ExecutionState: execution.FromString(props["ExecutionState"]),
+ PersistentState: distributor.PersistentState(props["PersistentState"]),
+ }, nil
+}
+
+func (j *jobsimDist) HandleTaskQueueTask(c context.Context, r *http.Request) error {
+ // body is a distributor.Token
+ rawTok, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ return err
+ }
+
+ ds := datastore.Get(c)
+ jtsk := &jobsimTask{ID: string(rawTok)}
+ if err := ds.Get(jtsk); err != nil {
+ return err
+ }
+
+ stage := jtsk.PersistentState.Stage
+
+ return nil
+}
+
+func init() {
+ distributor.Register((*jobsim.Config)(nil), func(c context.Context, cfg *distributor.Config) (distributor.D, error) {
+ return &jobsimDist{c, cfg}, nil
+ })
+}

Powered by Google App Engine
This is Rietveld 408576698