OLD | NEW |
(Empty) | |
| 1 // Copyright 2017 The LUCI Authors. |
| 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at |
| 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 // See the License for the specific language governing permissions and |
| 13 // limitations under the License. |
| 14 |
| 15 // Package demo shows how cron.Machines can be hosted with Datastore and TQ. |
| 16 package demo |
| 17 |
| 18 import ( |
| 19 "fmt" |
| 20 "net/http" |
| 21 "strconv" |
| 22 "time" |
| 23 |
| 24 "golang.org/x/net/context" |
| 25 |
| 26 "github.com/luci/gae/service/datastore" |
| 27 "github.com/luci/gae/service/taskqueue" |
| 28 |
| 29 "github.com/luci/luci-go/appengine/gaemiddleware" |
| 30 "github.com/luci/luci-go/common/clock" |
| 31 "github.com/luci/luci-go/common/data/rand/mathrand" |
| 32 "github.com/luci/luci-go/common/logging" |
| 33 "github.com/luci/luci-go/server/router" |
| 34 |
| 35 "github.com/luci/luci-go/scheduler/appengine/engine/cron" |
| 36 "github.com/luci/luci-go/scheduler/appengine/schedule" |
| 37 ) |
| 38 |
| 39 type CronState struct { |
| 40 _extra datastore.PropertyMap `gae:"-,extra"` |
| 41 |
| 42 ID string `gae:"$id"` |
| 43 State cron.State `gae:",noindex"` |
| 44 } |
| 45 |
| 46 func (s *CronState) schedule() *schedule.Schedule { |
| 47 parsed, err := schedule.Parse(s.ID, 0) |
| 48 if err != nil { |
| 49 panic(err) |
| 50 } |
| 51 return parsed |
| 52 } |
| 53 |
| 54 // evolve instantiates cron.Machine, calls the callback and submits emitted |
| 55 // actions. |
| 56 func evolve(c context.Context, id string, cb func(context.Context, *cron.Machine
) error) error { |
| 57 err := datastore.RunInTransaction(c, func(c context.Context) error { |
| 58 entity := CronState{ID: id} |
| 59 if err := datastore.Get(c, &entity); err != nil && err != datast
ore.ErrNoSuchEntity { |
| 60 return err |
| 61 } |
| 62 |
| 63 machine := &cron.Machine{ |
| 64 Now: clock.Now(c), |
| 65 Schedule: entity.schedule(), |
| 66 Nonce: func() int64 { return mathrand.Get(c).Int63()
+ 1 }, |
| 67 State: entity.State, |
| 68 } |
| 69 |
| 70 if err := cb(c, machine); err != nil { |
| 71 return err |
| 72 } |
| 73 |
| 74 for _, action := range machine.Actions { |
| 75 var task *taskqueue.Task |
| 76 switch a := action.(type) { |
| 77 case cron.TickLaterAction: |
| 78 logging.Infof(c, "Scheduling tick %d after %s",
a.TickNonce, a.When.Sub(time.Now())) |
| 79 task = &taskqueue.Task{ |
| 80 Path: fmt.Sprintf("/tick/%s/%d", id, a.T
ickNonce), |
| 81 ETA: a.When, |
| 82 } |
| 83 case cron.StartInvocationAction: |
| 84 task = &taskqueue.Task{ |
| 85 Path: fmt.Sprintf("/invocation/%s", id)
, |
| 86 Delay: time.Second, // give the transact
ion time to land |
| 87 } |
| 88 default: |
| 89 panic("unknown action type") |
| 90 } |
| 91 if err := taskqueue.Add(c, "default", task); err != nil
{ |
| 92 return err |
| 93 } |
| 94 } |
| 95 |
| 96 entity.State = machine.State |
| 97 return datastore.Put(c, &entity) |
| 98 }, nil) |
| 99 |
| 100 if err != nil { |
| 101 logging.Errorf(c, "FAIL - %s", err) |
| 102 } |
| 103 return err |
| 104 } |
| 105 |
| 106 func startJob(c context.Context, id string) error { |
| 107 return evolve(c, id, func(c context.Context, m *cron.Machine) error { |
| 108 // Forcefully restart the chain of tasks. |
| 109 m.Disable() |
| 110 m.Enable() |
| 111 return nil |
| 112 }) |
| 113 } |
| 114 |
| 115 func handleTick(c context.Context, id string, nonce int64) error { |
| 116 return evolve(c, id, func(c context.Context, m *cron.Machine) error { |
| 117 return m.OnTimerTick(nonce) |
| 118 }) |
| 119 } |
| 120 |
| 121 func handleInvocation(c context.Context, id string) error { |
| 122 logging.Infof(c, "INVOCATION of job %q has finished!", id) |
| 123 return evolve(c, id, func(c context.Context, m *cron.Machine) error { |
| 124 m.RewindIfNecessary() |
| 125 return nil |
| 126 }) |
| 127 } |
| 128 |
| 129 func init() { |
| 130 r := router.New() |
| 131 gaemiddleware.InstallHandlers(r) |
| 132 |
| 133 // Kick-start a bunch of jobs by visiting: |
| 134 // |
| 135 // http://localhost:8080/start/with 10s interval |
| 136 // http://localhost:8080/start/with 5s interval |
| 137 // http://localhost:8080/start/0 * * * * * * * |
| 138 // |
| 139 // And the look at the logs. |
| 140 |
| 141 r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context)
{ |
| 142 jobID := c.Params.ByName("JobID") |
| 143 if err := startJob(c.Context, jobID); err != nil { |
| 144 panic(err) |
| 145 } |
| 146 }) |
| 147 |
| 148 r.POST("/tick/:JobID/:TickNonce", gaemiddleware.BaseProd(), func(c *rout
er.Context) { |
| 149 jobID := c.Params.ByName("JobID") |
| 150 nonce, err := strconv.ParseInt(c.Params.ByName("TickNonce"), 10,
64) |
| 151 if err != nil { |
| 152 panic(err) |
| 153 } |
| 154 if err := handleTick(c.Context, jobID, nonce); err != nil { |
| 155 panic(err) |
| 156 } |
| 157 }) |
| 158 |
| 159 r.POST("/invocation/:JobID", gaemiddleware.BaseProd(), func(c *router.Co
ntext) { |
| 160 jobID := c.Params.ByName("JobID") |
| 161 if err := handleInvocation(c.Context, jobID); err != nil { |
| 162 panic(err) |
| 163 } |
| 164 }) |
| 165 |
| 166 http.DefaultServeMux.Handle("/", r) |
| 167 } |
OLD | NEW |