| Index: appengine/cmd/dm/distributor/impl/jobsim/distributor.go
|
| diff --git a/appengine/cmd/dm/distributor/impl/jobsim/distributor.go b/appengine/cmd/dm/distributor/impl/jobsim/distributor.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..858501208b11bb1d73cbeb1241cc678b9a0f4750
|
| --- /dev/null
|
| +++ b/appengine/cmd/dm/distributor/impl/jobsim/distributor.go
|
| @@ -0,0 +1,173 @@
|
| +// Copyright 2016 The Chromium Authors. All rights reserved.
|
| +// Use of this source code is governed by a BSD-style license that can be
|
| +// found in the LICENSE file.
|
| +
|
| +package jobsim
|
| +
|
| +import (
|
| + "encoding/json"
|
| + "fmt"
|
| + "io/ioutil"
|
| + "net/http"
|
| +
|
| + "github.com/luci/gae/service/datastore"
|
| + "github.com/luci/gae/service/info"
|
| + "github.com/luci/gae/service/taskqueue"
|
| +
|
| + "github.com/luci/luci-go/appengine/cmd/dm/distributor"
|
| + "github.com/luci/luci-go/appengine/cmd/dm/distributor/protos/jobsim"
|
| + "github.com/luci/luci-go/appengine/cmd/dm/enums/execution"
|
| +
|
| + "golang.org/x/net/context"
|
| + "google.golang.org/api/pubsub/v1"
|
| +)
|
| +
|
| +type jobsimDist struct {
|
| + c context.Context
|
| + cfg *distributor.Config
|
| +}
|
| +
|
| +func (j *jobsimDist) getNativeConfig() *jobsim.Config {
|
| + return j.cfg.ImplConfig.(*jobsim.Config)
|
| +}
|
| +
|
| +func (j *jobsimDist) Run(tsk distributor.TaskDescription) (tok distributor.Token, err error) {
|
| + ds := datastore.Get(j.c)
|
| +
|
| + sa, err := info.Get(j.c).ServiceAccount()
|
| + if err != nil {
|
| + return
|
| + }
|
| +
|
| + pubsub, token, err := tsk.PrepareTopic(sa)
|
| + if err != nil {
|
| + return
|
| + }
|
| +
|
| + jtsk := &jobsimTask{
|
| + AttemptID: *tsk.AttemptID(),
|
| + ExecutionID: tsk.ExecutionID(),
|
| + Calculation: tsk.Payload(),
|
| + ExecutionState: execution.Scheduled,
|
| + ExecutionKey: tsk.ExecutionKey(),
|
| + NotifyPath: pubsub,
|
| + NotifyToken: token,
|
| + }
|
| +
|
| + err = jtsk.PersistentState.FromPersistentState(tsk.PreviousState())
|
| + if err != nil {
|
| + return
|
| + }
|
| +
|
| + id, err := ds.AllocateIDs(ds.KeyForObj(jtsk), 1)
|
| + if err != nil {
|
| + return "", err
|
| + }
|
| +
|
| + // transactionally commit the job and a taskqueue task to execute it
|
| + jtsk.ID = fmt.Sprintf("%s|%d", j.getNativeConfig().Pool, id)
|
| + err = ds.RunInTransaction(func(c context.Context) error {
|
| + ds := datastore.Get(c)
|
| +
|
| + err := ds.Put(jtsk)
|
| + if err != nil {
|
| + return err
|
| + }
|
| +
|
| + tq := taskqueue.Get(c)
|
| + tsk := tq.NewTask(j.cfg.TQHandlerURL)
|
| + tsk.Payload = []byte(jtsk.ID)
|
| + return tq.Add(tsk, "")
|
| + }, nil)
|
| +
|
| + tok = distributor.Token(jtsk.ID)
|
| + return
|
| +}
|
| +
|
| +func (j *jobsimDist) Cancel(tok distributor.Token) error {
|
| + jtsk := &jobsimTask{ID: string(tok)}
|
| +
|
| + cancelBody := func(ds datastore.Interface) (needWrite bool, err error) {
|
| + if err = ds.Get(jtsk); err != nil {
|
| + return
|
| + }
|
| + if jtsk.ExecutionState.IsTerminal() {
|
| + return
|
| + }
|
| + needWrite = true
|
| + return
|
| + }
|
| +
|
| + ds := datastore.Get(j.c)
|
| + if needWrite, err := cancelBody(ds); err != nil || !needWrite {
|
| + return err
|
| + }
|
| +
|
| + return ds.RunInTransaction(func(c context.Context) error {
|
| + ds := datastore.Get(c)
|
| + if needWrite, err := cancelBody(ds); err != nil || !needWrite {
|
| + return err
|
| + }
|
| + if err := jtsk.ExecutionState.Evolve(execution.Cancelled); err != nil {
|
| + return err
|
| + }
|
| + return ds.Put(jtsk)
|
| + }, nil)
|
| +}
|
| +
|
| +func (j *jobsimDist) GetStatus(tok distributor.Token) (*distributor.TaskState, error) {
|
| + ds := datastore.Get(j.c)
|
| + jtsk := &jobsimTask{ID: string(tok)}
|
| + if err := ds.Get(jtsk); err != nil {
|
| + return nil, err
|
| + }
|
| + return &distributor.TaskState{
|
| + ExecutionState: jtsk.ExecutionState,
|
| + PersistentState: jtsk.PersistentState.ToPersistentState(),
|
| + }, nil
|
| +}
|
| +
|
| +func (j *jobsimDist) InfoURL(tok distributor.Token) string {
|
| + return fmt.Sprintf("jobsim://%s/ver/%s/tok/%s", j.cfg.CfgName, j.cfg.Version, tok)
|
| +}
|
| +
|
| +func (j *jobsimDist) HandleNotification(msg *pubsub.PubsubMessage) (*distributor.TaskState, error) {
|
| + if j.getNativeConfig().DoPollback {
|
| + return nil, nil
|
| + }
|
| +
|
| + props := map[string]string(nil)
|
| + err := json.Unmarshal([]byte(msg.Data), &props)
|
| + if err != nil {
|
| + return nil, err
|
| + }
|
| +
|
| + return &distributor.TaskState{
|
| + ExecutionState: execution.FromString(props["ExecutionState"]),
|
| + PersistentState: distributor.PersistentState(props["PersistentState"]),
|
| + }, nil
|
| +}
|
| +
|
| +func (j *jobsimDist) HandleTaskQueueTask(c context.Context, r *http.Request) error {
|
| + // body is a distributor.Token
|
| + rawTok, err := ioutil.ReadAll(r.Body)
|
| + if err != nil {
|
| + return err
|
| + }
|
| +
|
| + ds := datastore.Get(c)
|
| + jtsk := &jobsimTask{ID: string(rawTok)}
|
| + if err := ds.Get(jtsk); err != nil {
|
| + return err
|
| + }
|
| +
|
| + stage := jtsk.PersistentState.Stage
|
| +
|
| + return nil
|
| +}
|
| +
|
| +func init() {
|
| + distributor.Register((*jobsim.Config)(nil), func(c context.Context, cfg *distributor.Config) (distributor.D, error) {
|
| + return &jobsimDist{c, cfg}, nil
|
| + })
|
| +}
|
|
|