Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(38)

Side by Side Diff: scheduler/appengine/engine/tq/tq.go

Issue 2981043002: Add a task queue task router to reduce amount of boilerplate. (Closed)
Patch Set: tq-helper Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The LUCI Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // Package tq implements simple routing layer for task queue tasks.
16 package tq
17
18 import (
19 "bytes"
20 "encoding/json"
21 "fmt"
22 "io/ioutil"
23 "net/http"
24 "reflect"
25 "strconv"
26 "strings"
27 "sync"
28 "time"
29
30 "github.com/golang/protobuf/jsonpb"
31 "github.com/golang/protobuf/proto"
32 "golang.org/x/net/context"
33
34 "github.com/luci/gae/service/taskqueue"
35
36 "github.com/luci/luci-go/appengine/gaemiddleware"
37 "github.com/luci/luci-go/common/data/stringset"
38 "github.com/luci/luci-go/common/logging"
39 "github.com/luci/luci-go/common/retry/transient"
40 "github.com/luci/luci-go/server/router"
41 )
42
43 // Dispatcher submits and handles task queue tasks.
44 type Dispatcher struct {
45 BaseURL string // URL prefix for all URLs, "/internal/tasks/" by default
46
47 mu sync.RWMutex
48 handlers map[string]handler // the key is proto message type name
49 }
50
51 // Task contains task body and additional parameters that influence how it is
52 // routed.
53 type Task struct {
54 // Payload is task's payload as well as indicator of its type.
55 //
56 // Tasks are routed based on type of the payload message, see RegisterTa sk.
57 Payload proto.Message
58
59 // Title is optional string that identifies the task in HTTP logs.
60 //
61 // It will show up as a suffix in task handler URL. It exists exclusivel y to
62 // simplify reading HTTP logs. It serves no other purpose! In particular ,
63 // it is NOT a task name.
64 //
65 // Handlers won't ever see it. Pass all information through the task bod y.
66 Title string
67
68 // Delay specifies the duration the task queue service must wait before
69 // executing the task.
70 //
71 // Either Delay or ETA may be set, but not both.
72 Delay time.Duration
73
74 // ETA specifies the earliest time a task may be executed.
75 //
76 // Either Delay or ETA may be set, but not both.
77 ETA time.Time
78
79 // Retry options for this task.
80 //
81 // If given, overrides default options set when this task was registered .
82 RetryOptions *taskqueue.RetryOptions
83 }
84
85 // Handler is called to handle one enqueued task.
86 //
87 // The passed context is produced by a middleware chain installed with
88 // InstallHandlers.
89 //
90 // execCount corresponds to X-AppEngine-TaskExecutionCount header value: it is
91 // 1 on first execution attempt, 2 on a retry, and so on.
92 //
93 // May return transient errors. In this case, task queue may attempt to
94 // redeliver the task (depending on RetryOptions).
95 //
96 // A fatal error (or success) mark the task as "done", it won't be retried.
97 type Handler func(c context.Context, payload proto.Message, execCount int) error
98
99 // RegisterTask tells the dispatcher that tasks of given proto type should be
100 // handled by the given handler and routed through the given task queue.
101 //
102 // 'prototype' should be a pointer to some concrete proto message. It will be
103 // used only for its type signature.
104 //
105 // Intended to be called during process startup. Panics if such message has
106 // already been registered.
107 func (d *Dispatcher) RegisterTask(prototype proto.Message, cb Handler, queue str ing, opts *taskqueue.RetryOptions) {
108 if queue == "" {
109 queue = "default" // default GAE task queue name, always exists
110 }
111
112 name := proto.MessageName(prototype)
113 if name == "" {
114 panic(fmt.Sprintf("unregistered proto message type %T", prototyp e))
115 }
116
117 d.mu.Lock()
118 defer d.mu.Unlock()
119
120 if _, ok := d.handlers[name]; ok {
121 panic(fmt.Sprintf("handler for %q has already been registered", name))
122 }
123
124 if d.handlers == nil {
125 d.handlers = make(map[string]handler)
126 }
127
128 d.handlers[name] = handler{cb: cb, queue: queue, retryOpts: opts}
129 }
130
131 // AddTask submits the given task to an appropriate task queue.
132 //
133 // It means, add some later time in some other GAE process, the callback
134 // registered as a handler for corresponding proto type will be called.
135 //
136 // If the given context is transactional, inherits the transaction.
137 //
138 // May return transient errors.
139 func (d *Dispatcher) AddTask(c context.Context, task Task) error {
140 handler, err := d.handler(task.Payload)
141 if err != nil {
142 return err
143 }
144
145 blob, err := serializePayload(task.Payload)
146 if err != nil {
147 return err
148 }
149
150 title := "-"
151 if task.Title != "" {
152 title = task.Title
153 }
154
155 retryOpts := handler.retryOpts
156 if task.RetryOptions != nil {
157 retryOpts = task.RetryOptions
158 }
159
160 t := taskqueue.Task{
161 Path: fmt.Sprintf("%s%s/%s", d.baseURL(), handler.queue, title),
162 Method: "POST",
163 Payload: blob,
164 ETA: task.ETA,
165 Delay: task.Delay,
166 RetryOptions: retryOpts,
167 }
168 if err := taskqueue.Add(c, handler.queue, &t); err != nil {
169 return transient.Tag.Apply(err)
170 }
171
172 return nil
173 }
174
175 // InstallRoutes installs appropriate HTTP routes in the router.
176 //
177 // Must be called only after all task handlers are registered!
178 func (d *Dispatcher) InstallRoutes(r *router.Router, mw router.MiddlewareChain) {
179 queues := stringset.New(0)
180
181 d.mu.RLock()
182 for _, h := range d.handlers {
183 queues.Add(h.queue)
184 }
185 d.mu.RUnlock()
186
187 for _, q := range queues.ToSlice() {
188 r.POST(
189 fmt.Sprintf("%s%s/:Title", d.baseURL(), q),
190 mw.Extend(gaemiddleware.RequireTaskQueue(q)),
191 d.processHTTPRequest)
192 }
193 }
194
195 ////////////////////////////////////////////////////////////////////////////////
196
197 type handler struct {
198 cb Handler // the actual handler
199 queue string // name of the task queue
200 retryOpts *taskqueue.RetryOptions // default retry options
201 }
202
203 // baseURL returns a URL prefix for all HTTP routes used by Dispatcher.
204 //
205 // It ends with '/'.
206 func (d *Dispatcher) baseURL() string {
207 switch {
208 case d.BaseURL != "" && strings.HasSuffix(d.BaseURL, "/"):
209 return d.BaseURL
210 case d.BaseURL != "":
211 return d.BaseURL + "/"
212 default:
213 return "/internal/tasks/"
214 }
215 }
216
217 // handler returns a handler struct registered with Register.
218 func (d *Dispatcher) handler(payload proto.Message) (handler, error) {
219 name := proto.MessageName(payload)
220
221 d.mu.RLock()
222 defer d.mu.RUnlock()
223
224 handler, registered := d.handlers[name]
225 if !registered {
226 return handler, fmt.Errorf("handler for %q is not registered", n ame)
227 }
228 return handler, nil
229 }
230
231 // processHTTPRequest is invoked on each HTTP POST.
232 //
233 // It deserializes the task and invokes an appropriate callback.
234 func (d *Dispatcher) processHTTPRequest(c *router.Context) {
235 body, err := ioutil.ReadAll(c.Request.Body)
236 if err != nil {
237 httpReply(c, false, 500, "Failed to read request body: %s", err)
238 return
239 }
240
241 payload, err := deserializePayload(body)
242 if err != nil {
243 httpReply(c, false, 400, "Bad payload, can't deserialize: %s", e rr)
244 return
245 }
246
247 h, err := d.handler(payload)
248 if err != nil {
249 httpReply(c, false, 404, "Bad task: %s", err)
250 return
251 }
252
253 execCount, _ := strconv.Atoi(c.Request.Header.Get("X-AppEngine-TaskExecu tionCount"))
254 switch h.cb(c.Context, payload, execCount); {
255 case err == nil:
256 httpReply(c, true, 200, "OK")
257 case transient.Tag.In(err):
258 httpReply(c, false, 500, "Transient error: %s", err)
259 default:
260 httpReply(c, false, 200, "Fatal error: %s", err) // return 200 t o stop retries
261 }
262 }
263
264 func httpReply(c *router.Context, ok bool, code int, msg string, args ...interfa ce{}) {
265 body := fmt.Sprintf(msg, args...)
266 if !ok {
267 logging.Errorf(c.Context, "%s", body)
268 }
269 http.Error(c.Writer, body, code)
270 }
271
272 ////////////////////////////////////////////////////////////////////////////////
273
274 var marshaller = jsonpb.Marshaler{}
275
276 type envelope struct {
277 Type string `json:"type"`
278 Body *json.RawMessage `json:"body"`
279 }
280
281 func serializePayload(task proto.Message) ([]byte, error) {
282 var buf bytes.Buffer
283 if err := marshaller.Marshal(&buf, task); err != nil {
Vadim Sh. 2017/07/15 01:16:29 Using JSON to be able to see the task bodies in Cl
284 return nil, err
285 }
286 raw := json.RawMessage(buf.Bytes())
287 return json.Marshal(envelope{
288 Type: proto.MessageName(task),
289 Body: &raw,
290 })
291 }
292
293 func deserializePayload(blob []byte) (proto.Message, error) {
294 env := envelope{}
295 if err := json.Unmarshal(blob, &env); err != nil {
296 return nil, err
297 }
298
299 tp := proto.MessageType(env.Type) // this is **ConcreteStruct{}
300 if tp == nil {
301 return nil, fmt.Errorf("unregistered proto message name %q", env .Type)
302 }
303 if env.Body == nil {
304 return nil, fmt.Errorf("no task body given")
305 }
306
307 task := reflect.New(tp.Elem()).Interface().(proto.Message)
308 if err := jsonpb.Unmarshal(bytes.NewReader(*env.Body), task); err != nil {
309 return nil, err
310 }
311
312 return task, nil
313 }
OLDNEW
« scheduler/appengine/engine/cron/demo/main.go ('K') | « scheduler/appengine/engine/tq/globals.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698