Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(58)

Side by Side Diff: scheduler/appengine/engine/cron/demo/main.go

Issue 2981043002: Add a task queue task router to reduce amount of boilerplate. (Closed)
Patch Set: tq-helper Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2017 The LUCI Authors. 1 // Copyright 2017 The LUCI Authors.
2 // 2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License. 4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at 5 // You may obtain a copy of the License at
6 // 6 //
7 // http://www.apache.org/licenses/LICENSE-2.0 7 // http://www.apache.org/licenses/LICENSE-2.0
8 // 8 //
9 // Unless required by applicable law or agreed to in writing, software 9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, 10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and 12 // See the License for the specific language governing permissions and
13 // limitations under the License. 13 // limitations under the License.
14 14
15 // Package demo shows how cron.Machines can be hosted with Datastore and TQ. 15 // Package demo shows how cron.Machines can be hosted with Datastore and TQ.
16 package demo 16 package demo
17 17
18 import ( 18 import (
19 "fmt"
20 "net/http" 19 "net/http"
21 "strconv"
22 "time" 20 "time"
23 21
24 "golang.org/x/net/context" 22 "golang.org/x/net/context"
25 23
24 "github.com/golang/protobuf/proto"
26 "github.com/luci/gae/service/datastore" 25 "github.com/luci/gae/service/datastore"
27 "github.com/luci/gae/service/taskqueue"
28 26
29 "github.com/luci/luci-go/appengine/gaemiddleware" 27 "github.com/luci/luci-go/appengine/gaemiddleware"
30 "github.com/luci/luci-go/common/clock" 28 "github.com/luci/luci-go/common/clock"
31 "github.com/luci/luci-go/common/data/rand/mathrand" 29 "github.com/luci/luci-go/common/data/rand/mathrand"
32 "github.com/luci/luci-go/common/logging" 30 "github.com/luci/luci-go/common/logging"
33 "github.com/luci/luci-go/server/router" 31 "github.com/luci/luci-go/server/router"
34 32
35 "github.com/luci/luci-go/scheduler/appengine/engine/cron" 33 "github.com/luci/luci-go/scheduler/appengine/engine/cron"
34 "github.com/luci/luci-go/scheduler/appengine/engine/internal"
35 "github.com/luci/luci-go/scheduler/appengine/engine/tq"
36 "github.com/luci/luci-go/scheduler/appengine/schedule" 36 "github.com/luci/luci-go/scheduler/appengine/schedule"
37 ) 37 )
38 38
39 type CronState struct { 39 type CronState struct {
40 _extra datastore.PropertyMap `gae:"-,extra"` 40 _extra datastore.PropertyMap `gae:"-,extra"`
41 41
42 ID string `gae:"$id"` 42 ID string `gae:"$id"`
43 State cron.State `gae:",noindex"` 43 State cron.State `gae:",noindex"`
44 } 44 }
45 45
(...skipping 19 matching lines...) Expand all
65 Schedule: entity.schedule(), 65 Schedule: entity.schedule(),
66 Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 }, 66 Nonce: func() int64 { return mathrand.Get(c).Int63() + 1 },
67 State: entity.State, 67 State: entity.State,
68 } 68 }
69 69
70 if err := cb(c, machine); err != nil { 70 if err := cb(c, machine); err != nil {
71 return err 71 return err
72 } 72 }
73 73
74 for _, action := range machine.Actions { 74 for _, action := range machine.Actions {
75 » » » var task *taskqueue.Task 75 » » » var task tq.Task
76 switch a := action.(type) { 76 switch a := action.(type) {
77 case cron.TickLaterAction: 77 case cron.TickLaterAction:
78 logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now())) 78 logging.Infof(c, "Scheduling tick %d after %s", a.TickNonce, a.When.Sub(time.Now()))
79 » » » » task = &taskqueue.Task{ 79 » » » » task = tq.Task{
80 » » » » » Path: fmt.Sprintf("/tick/%s/%d", id, a.T ickNonce), 80 » » » » » Payload: &internal.TickLaterTask{JobId: id, TickNonce: a.TickNonce},
81 » » » » » ETA: a.When, 81 » » » » » ETA: a.When,
82 } 82 }
83 case cron.StartInvocationAction: 83 case cron.StartInvocationAction:
84 » » » » task = &taskqueue.Task{ 84 » » » » task = tq.Task{
85 » » » » » Path: fmt.Sprintf("/invocation/%s", id) , 85 » » » » » Payload: &internal.StartInvocationTask{J obId: id},
86 » » » » » Delay: time.Second, // give the transact ion time to land 86 » » » » » Delay: time.Second, // give the transa ction time to land
87 } 87 }
88 default: 88 default:
89 panic("unknown action type") 89 panic("unknown action type")
90 } 90 }
91 » » » if err := taskqueue.Add(c, "default", task); err != nil { 91 » » » if err := tq.AddTask(c, task); err != nil {
92 return err 92 return err
93 } 93 }
94 } 94 }
95 95
96 entity.State = machine.State 96 entity.State = machine.State
97 return datastore.Put(c, &entity) 97 return datastore.Put(c, &entity)
98 }, nil) 98 }, nil)
99 99
100 if err != nil { 100 if err != nil {
101 logging.Errorf(c, "FAIL - %s", err) 101 logging.Errorf(c, "FAIL - %s", err)
102 } 102 }
103 return err 103 return err
104 } 104 }
105 105
106 func startJob(c context.Context, id string) error { 106 func startJob(c context.Context, id string) error {
107 return evolve(c, id, func(c context.Context, m *cron.Machine) error { 107 return evolve(c, id, func(c context.Context, m *cron.Machine) error {
108 // Forcefully restart the chain of tasks. 108 // Forcefully restart the chain of tasks.
109 m.Disable() 109 m.Disable()
110 m.Enable() 110 m.Enable()
111 return nil 111 return nil
112 }) 112 })
113 } 113 }
114 114
115 func handleTick(c context.Context, id string, nonce int64) error { 115 func handleTick(c context.Context, task proto.Message, execCount int) error {
116 » return evolve(c, id, func(c context.Context, m *cron.Machine) error { 116 » msg := task.(*internal.TickLaterTask)
117 » » return m.OnTimerTick(nonce) 117 » return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or {
118 » » return m.OnTimerTick(msg.TickNonce)
118 }) 119 })
119 } 120 }
120 121
121 func handleInvocation(c context.Context, id string) error { 122 func handleInvocation(c context.Context, task proto.Message, execCount int) erro r {
122 » logging.Infof(c, "INVOCATION of job %q has finished!", id) 123 » msg := task.(*internal.StartInvocationTask)
123 » return evolve(c, id, func(c context.Context, m *cron.Machine) error { 124 » logging.Infof(c, "INVOCATION of job %q has finished!", msg.JobId)
125 » return evolve(c, msg.JobId, func(c context.Context, m *cron.Machine) err or {
124 m.RewindIfNecessary() 126 m.RewindIfNecessary()
125 return nil 127 return nil
126 }) 128 })
127 } 129 }
128 130
129 func init() { 131 func init() {
130 r := router.New() 132 r := router.New()
131 gaemiddleware.InstallHandlers(r) 133 gaemiddleware.InstallHandlers(r)
132 134
135 tq.RegisterTask(&internal.TickLaterTask{}, handleTick, "default", nil)
Vadim Sh. 2017/07/15 01:16:29 task type itself is used as routing key
136 tq.RegisterTask(&internal.StartInvocationTask{}, handleInvocation, "defa ult", nil)
137 tq.DefaultDispatcher.InstallRoutes(r, gaemiddleware.BaseProd())
138
133 // Kick-start a bunch of jobs by visiting: 139 // Kick-start a bunch of jobs by visiting:
134 // 140 //
135 // http://localhost:8080/start/with 10s interval 141 // http://localhost:8080/start/with 10s interval
136 // http://localhost:8080/start/with 5s interval 142 // http://localhost:8080/start/with 5s interval
137 // http://localhost:8080/start/0 * * * * * * * 143 // http://localhost:8080/start/0 * * * * * * *
138 // 144 //
139 // And the look at the logs. 145 // And the look at the logs.
140 146
141 r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) { 147 r.GET("/start/:JobID", gaemiddleware.BaseProd(), func(c *router.Context) {
142 jobID := c.Params.ByName("JobID") 148 jobID := c.Params.ByName("JobID")
143 if err := startJob(c.Context, jobID); err != nil { 149 if err := startJob(c.Context, jobID); err != nil {
144 panic(err) 150 panic(err)
145 } 151 }
146 }) 152 })
147 153
148 r.POST("/tick/:JobID/:TickNonce", gaemiddleware.BaseProd(), func(c *rout er.Context) {
149 jobID := c.Params.ByName("JobID")
150 nonce, err := strconv.ParseInt(c.Params.ByName("TickNonce"), 10, 64)
151 if err != nil {
152 panic(err)
153 }
154 if err := handleTick(c.Context, jobID, nonce); err != nil {
155 panic(err)
156 }
157 })
158
159 r.POST("/invocation/:JobID", gaemiddleware.BaseProd(), func(c *router.Co ntext) {
160 jobID := c.Params.ByName("JobID")
161 if err := handleInvocation(c.Context, jobID); err != nil {
162 panic(err)
163 }
164 })
165
166 http.DefaultServeMux.Handle("/", r) 154 http.DefaultServeMux.Handle("/", r)
167 } 155 }
OLDNEW
« no previous file with comments | « no previous file | scheduler/appengine/engine/internal/gen.go » ('j') | scheduler/appengine/engine/tq/tq.go » ('J')

Powered by Google App Engine
This is Rietveld 408576698