| Index: scheduler/appengine/engine/cron/demo/main.go
|
| diff --git a/scheduler/appengine/engine/cron/demo/main.go b/scheduler/appengine/engine/cron/demo/main.go
|
| new file mode 100644
|
| index 0000000000000000000000000000000000000000..e24f1be0978f157051a71004853be5a507801aab
|
| --- /dev/null
|
| +++ b/scheduler/appengine/engine/cron/demo/main.go
|
| @@ -0,0 +1,167 @@
|
| +// Copyright 2017 The LUCI Authors.
|
| +//
|
| +// Licensed under the Apache License, Version 2.0 (the "License");
|
| +// you may not use this file except in compliance with the License.
|
| +// You may obtain a copy of the License at
|
| +//
|
| +// http://www.apache.org/licenses/LICENSE-2.0
|
| +//
|
| +// Unless required by applicable law or agreed to in writing, software
|
| +// distributed under the License is distributed on an "AS IS" BASIS,
|
| +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| +// See the License for the specific language governing permissions and
|
| +// limitations under the License.
|
| +
|
| +// Package demo shows how cron.Machines can be hosted with Datastore and TQ.
|
| +package demo
|
| +
|
| +import (
|
| + "fmt"
|
| + "net/http"
|
| + "strconv"
|
| + "time"
|
| +
|
| + "golang.org/x/net/context"
|
| +
|
| + "github.com/luci/gae/service/datastore"
|
| + "github.com/luci/gae/service/taskqueue"
|
| +
|
| + "github.com/luci/luci-go/appengine/gaemiddleware"
|
| + "github.com/luci/luci-go/common/clock"
|
| + "github.com/luci/luci-go/common/data/rand/mathrand"
|
| + "github.com/luci/luci-go/common/logging"
|
| + "github.com/luci/luci-go/server/router"
|
| +
|
| + "github.com/luci/luci-go/scheduler/appengine/engine/cron"
|
| + "github.com/luci/luci-go/scheduler/appengine/schedule"
|
| +)
|
| +
|
| +type CronState struct {
|
| + _extra datastore.PropertyMap `gae:"-,extra"`
|
| +
|
| + ID string `gae:"$id"`
|
| + State cron.State `gae:",noindex"`
|
| +}
|
| +
|
| +func (s *CronState) schedule() *schedule.Schedule {
|
| + parsed, err := schedule.Parse(s.ID, 0)
|
| + if err != nil {
|
| + panic(err)
|
| + }
|
| + return parsed
|
| +}
|
| +
|
| +// evolve instantiates cron.Machine, calls the callback and submits emitted
|
| +// actions.
|
| +func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine) error) error {
|
| + err := datastore.RunInTransaction(c, func(c context.Context) error {
|
| + entity := CronState{ID: id}
|
| + if err := datastore.Get(c, &entity); err != nil && err != datastore.ErrNoSuchEntity {
|
| + return err
|
| + }
|
| +
|
| + machine := &cron.Machine{
|
| + Now: clock.Now(c),
|
| + Schedule: entity.schedule(),
|
| + Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 },
|
| + State: entity.State,
|
| + }
|
| +
|
| + if err := cb(c, machine); err != nil {
|
| + return err
|
| + }
|
| +
|
| + for _, action := range machine.Actions {
|
| + var task *taskqueue.Task
|
| + switch a := action.(type) {
|
| + case cron.TickLaterAction:
|
| + logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now()))
|
| + task = &taskqueue.Task{
|
| + Path: fmt.Sprintf("/tick/%s/%d", id, a.TickNonce),
|
| + ETA: a.When,
|
| + }
|
| + case cron.StartInvocationAction:
|
| + task = &taskqueue.Task{
|
| + Path: fmt.Sprintf("/invocation/%s", id),
|
| + Delay: time.Second, // give the transaction time to land
|
| + }
|
| + default:
|
| + panic("unknown action type")
|
| + }
|
| + if err := taskqueue.Add(c, "default", task); err != nil {
|
| + return err
|
| + }
|
| + }
|
| +
|
| + entity.State = machine.State
|
| + return datastore.Put(c, &entity)
|
| + }, nil)
|
| +
|
| + if err != nil {
|
| + logging.Errorf(c, "FAIL - %s", err)
|
| + }
|
| + return err
|
| +}
|
| +
|
| +func startJob(c context.Context, id string) error {
|
| + return evolve(c, id, func(c context.Context, m *cron.Machine) error {
|
| + // Forcefully restart the chain of tasks.
|
| + m.Disable()
|
| + m.Enable()
|
| + return nil
|
| + })
|
| +}
|
| +
|
| +func handleTick(c context.Context, id string, nonce int64) error {
|
| + return evolve(c, id, func(c context.Context, m *cron.Machine) error {
|
| + return m.OnTimerTick(nonce)
|
| + })
|
| +}
|
| +
|
| +func handleInvocation(c context.Context, id string) error {
|
| + logging.Infof(c, "INVOCATION of job %q has finished!", id)
|
| + return evolve(c, id, func(c context.Context, m *cron.Machine) error {
|
| + m.RewindIfNecessary()
|
| + return nil
|
| + })
|
| +}
|
| +
|
| +func init() {
|
| + r := router.New()
|
| + gaemiddleware.InstallHandlers(r)
|
| +
|
| + // Kick-start a bunch of jobs by visiting:
|
| + //
|
| + // http://localhost:8080/start/with 10s interval
|
| + // http://localhost:8080/start/with 5s interval
|
| + // http://localhost:8080/start/0 * * * * * * *
|
| + //
|
| + // And the look at the logs.
|
| +
|
| + r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) {
|
| + jobID := c.Params.ByName("JobID")
|
| + if err := startJob(c.Context, jobID); err != nil {
|
| + panic(err)
|
| + }
|
| + })
|
| +
|
| + r.POST("/tick/:JobID/:TickNonce", gaemiddleware.BaseProd(), func(c *router.Context) {
|
| + jobID := c.Params.ByName("JobID")
|
| + nonce, err := strconv.ParseInt(c.Params.ByName("TickNonce"), 10, 64)
|
| + if err != nil {
|
| + panic(err)
|
| + }
|
| + if err := handleTick(c.Context, jobID, nonce); err != nil {
|
| + panic(err)
|
| + }
|
| + })
|
| +
|
| + r.POST("/invocation/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) {
|
| + jobID := c.Params.ByName("JobID")
|
| + if err := handleInvocation(c.Context, jobID); err != nil {
|
| + panic(err)
|
| + }
|
| + })
|
| +
|
| + http.DefaultServeMux.Handle("/", r)
|
| +}
|
|
|