Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(772)

Side by Side Diff: scheduler/appengine/engine/tq/tq.go

Issue 2981043002: Add a task queue task router to reduce amount of boilerplate. (Closed)
Patch Set: comment nit Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « scheduler/appengine/engine/internal/tq_tasks.pb.go ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2017 The LUCI Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // Package tq implements simple routing layer for task queue tasks.
tandrii(chromium) 2017/07/17 09:54:30 once this is known to work (ie with tests :P), per
Vadim Sh. 2017/07/23 19:58:02 Maybe, if it's usable to anyone else.
16 package tq
17
18 import (
19 "bytes"
20 "crypto/sha256"
21 "encoding/hex"
22 "encoding/json"
23 "fmt"
24 "io/ioutil"
25 "net/http"
26 "reflect"
27 "strconv"
28 "strings"
29 "sync"
30 "time"
31
32 "github.com/golang/protobuf/jsonpb"
33 "github.com/golang/protobuf/proto"
34 "golang.org/x/net/context"
35
36 "github.com/luci/gae/service/taskqueue"
37
38 "github.com/luci/luci-go/appengine/gaemiddleware"
39 "github.com/luci/luci-go/common/data/stringset"
40 "github.com/luci/luci-go/common/errors"
41 "github.com/luci/luci-go/common/logging"
42 "github.com/luci/luci-go/common/retry/transient"
43 "github.com/luci/luci-go/server/router"
44 )
45
46 // Dispatcher submits and handles task queue tasks.
47 type Dispatcher struct {
48 BaseURL string // URL prefix for all URLs, "/internal/tasks/" by default
49
50 mu sync.RWMutex
51 handlers map[string]handler // the key is proto message type name
52 }
53
54 // Task contains task body and additional parameters that influence how it is
55 // routed.
56 type Task struct {
57 // Payload is task's payload as well as indicator of its type.
58 //
59 // Tasks are routed based on type of the payload message, see RegisterTa sk.
60 Payload proto.Message
61
62 // DeduplicationKey is optional unique key of the task.
63 //
64 // If a task with given key has already been enqueued recently, this tas k
65 // will be silently ignored.
66 //
67 // Such tasks can only be used outside of transactions.
68 DeduplicationKey string
69
70 // Title is optional string that identifies the task in HTTP logs.
71 //
72 // It will show up as a suffix in task handler URL. It exists exclusivel y to
73 // simplify reading HTTP logs. It serves no other purpose! In particular ,
74 // it is NOT a task name.
75 //
76 // Handlers won't ever see it. Pass all information through the task bod y.
77 Title string
78
79 // Delay specifies the duration the task queue service must wait before
80 // executing the task.
81 //
82 // Either Delay or ETA may be set, but not both.
83 Delay time.Duration
84
85 // ETA specifies the earliest time a task may be executed.
86 //
87 // Either Delay or ETA may be set, but not both.
88 ETA time.Time
89
90 // Retry options for this task.
91 //
92 // If given, overrides default options set when this task was registered .
93 RetryOptions *taskqueue.RetryOptions
94 }
95
96 // Handler is called to handle one enqueued task.
97 //
98 // The passed context is produced by a middleware chain installed with
99 // InstallHandlers.
100 //
101 // execCount corresponds to X-AppEngine-TaskExecutionCount header value: it is
102 // 1 on first execution attempt, 2 on a retry, and so on.
103 //
104 // May return transient errors. In this case, task queue may attempt to
105 // redeliver the task (depending on RetryOptions).
106 //
107 // A fatal error (or success) mark the task as "done", it won't be retried.
108 type Handler func(c context.Context, payload proto.Message, execCount int) error
109
110 // RegisterTask tells the dispatcher that tasks of given proto type should be
111 // handled by the given handler and routed through the given task queue.
112 //
113 // 'prototype' should be a pointer to some concrete proto message. It will be
114 // used only for its type signature.
115 //
116 // Intended to be called during process startup. Panics if such message has
117 // already been registered.
118 func (d *Dispatcher) RegisterTask(prototype proto.Message, cb Handler, queue str ing, opts *taskqueue.RetryOptions) {
119 if queue == "" {
120 queue = "default" // default GAE task queue name, always exists
121 }
122
123 name := proto.MessageName(prototype)
124 if name == "" {
125 panic(fmt.Sprintf("unregistered proto message type %T", prototyp e))
126 }
127
128 d.mu.Lock()
129 defer d.mu.Unlock()
130
131 if _, ok := d.handlers[name]; ok {
132 panic(fmt.Sprintf("handler for %q has already been registered", name))
133 }
134
135 if d.handlers == nil {
136 d.handlers = make(map[string]handler)
137 }
138
139 d.handlers[name] = handler{
140 cb: cb,
141 queue: queue,
142 title: name,
143 retryOpts: opts,
144 }
145 }
146
147 // AddTask submits the given task to an appropriate task queue.
148 //
149 // It means, add some later time in some other GAE process, the callback
tandrii(chromium) 2017/07/17 09:54:30 s/add/at
Vadim Sh. 2017/07/23 19:58:02 Done.
150 // registered as a handler for corresponding proto type will be called.
151 //
152 // If the given context is transactional, inherits the transaction.
153 //
154 // May return transient errors.
155 func (d *Dispatcher) AddTask(c context.Context, task *Task) error {
156 // Note: we don't reuse AddTasks here do avoid overhead on additional
157 // structures and goroutines it is using.
158 t, queue, err := d.tqTask(task)
159 if err != nil {
160 return err
161 }
162 if err := taskqueue.Add(c, queue, t); err != nil {
163 if err == taskqueue.ErrTaskAlreadyAdded {
164 return nil
165 }
166 return transient.Tag.Apply(err)
167 }
168 return nil
169 }
170
171 // AddTasks is batch variant of AddTask.
172 //
173 // Note that it is not atomic outside of transactions. Returns an error if
174 // at least one enqueue operation failed (there's no way to figure out which one
175 // exactly).
176 func (d *Dispatcher) AddTasks(c context.Context, tasks []*Task) error {
177 if len(tasks) == 0 {
178 return nil
179 }
180
181 perQueue := map[string][]*taskqueue.Task{}
182 for _, task := range tasks {
183 t, queue, err := d.tqTask(task)
184 if err != nil {
185 return err
186 }
187 perQueue[queue] = append(perQueue[queue], t)
188 }
189
190 // Enqueue in parallel, per-queue, split into batches based on Task Queu e
191 // RPC limits (100 tasks per batch).
192 errs := make(chan error)
193 ops := 0
194 for q, tasks := range perQueue {
195 for len(tasks) > 0 {
196 count := 100
197 if count > len(tasks) {
198 count = len(tasks)
199 }
200 go func(q string, batch []*taskqueue.Task) {
201 errs <- taskqueue.Add(c, q, batch...)
202 }(q, tasks[:count])
203 tasks = tasks[count:]
204 ops++
205 }
206 }
207
208 // Gather all errors throwing away ErrTaskAlreadyAdded.
209 var all errors.MultiError
210 for i := 0; i < ops; i++ {
211 err := <-errs
tandrii(chromium) 2017/07/17 09:54:30 for below lines: this really asks for MultiError.A
Vadim Sh. 2017/07/23 19:58:02 I don't like bloating interface of already quite c
212 if merr, yep := err.(errors.MultiError); yep {
tandrii(chromium) 2017/07/17 09:54:30 yep :)
213 for _, e := range merr {
214 if e != taskqueue.ErrTaskAlreadyAdded {
215 all = append(all, e)
216 }
217 }
218 } else if err != nil && err != taskqueue.ErrTaskAlreadyAdded {
219 all = append(all, err)
220 }
221 }
222
223 if len(all) == 0 {
224 return nil
225 }
226
227 return transient.Tag.Apply(all)
228 }
229
230 // InstallRoutes installs appropriate HTTP routes in the router.
231 //
232 // Must be called only after all task handlers are registered!
233 func (d *Dispatcher) InstallRoutes(r *router.Router, mw router.MiddlewareChain) {
234 queues := stringset.New(0)
235
236 d.mu.RLock()
237 for _, h := range d.handlers {
238 queues.Add(h.queue)
239 }
240 d.mu.RUnlock()
241
242 for _, q := range queues.ToSlice() {
243 r.POST(
244 fmt.Sprintf("%s%s/*title", d.baseURL(), q),
245 mw.Extend(gaemiddleware.RequireTaskQueue(q)),
246 d.processHTTPRequest)
247 }
248 }
249
250 ////////////////////////////////////////////////////////////////////////////////
251
252 type handler struct {
253 cb Handler // the actual handler
254 queue string // name of the task queue
255 title string // default task title
256 retryOpts *taskqueue.RetryOptions // default retry options
257 }
258
259 // tqTask constructs task queue task struct.
260 func (d *Dispatcher) tqTask(task *Task) (*taskqueue.Task, string, error) {
261 handler, err := d.handler(task.Payload)
262 if err != nil {
263 return nil, "", err
264 }
265
266 blob, err := serializePayload(task.Payload)
267 if err != nil {
268 return nil, "", err
269 }
270
271 title := handler.title
272 if task.Title != "" {
273 title = task.Title
274 }
275
276 retryOpts := handler.retryOpts
277 if task.RetryOptions != nil {
278 retryOpts = task.RetryOptions
279 }
280
281 // There's some weird restrictions on what characters are allowed inside task
282 // names. Lexicographically close names also cause hot spot issues in th e
283 // Task Queues backend. To avoid these two issues, we always use SHA256 hashes
284 // for task names.
285 name := ""
286 if task.DeduplicationKey != "" {
287 h := sha256.New()
288 h.Write([]byte(task.DeduplicationKey))
289 name = hex.EncodeToString(h.Sum(nil))
290 }
291
292 return &taskqueue.Task{
293 Path: fmt.Sprintf("%s%s/%s", d.baseURL(), handler.queue, title),
294 Name: name,
295 Method: "POST",
296 Payload: blob,
297 ETA: task.ETA,
298 Delay: task.Delay,
299 RetryOptions: retryOpts,
300 }, handler.queue, nil
301 }
302
303 // baseURL returns a URL prefix for all HTTP routes used by Dispatcher.
304 //
305 // It ends with '/'.
306 func (d *Dispatcher) baseURL() string {
307 switch {
308 case d.BaseURL != "" && strings.HasSuffix(d.BaseURL, "/"):
309 return d.BaseURL
310 case d.BaseURL != "":
311 return d.BaseURL + "/"
312 default:
313 return "/internal/tasks/"
314 }
315 }
316
317 // handler returns a handler struct registered with Register.
318 func (d *Dispatcher) handler(payload proto.Message) (handler, error) {
319 name := proto.MessageName(payload)
320
321 d.mu.RLock()
322 defer d.mu.RUnlock()
323
324 handler, registered := d.handlers[name]
325 if !registered {
326 return handler, fmt.Errorf("handler for %q is not registered", n ame)
327 }
328 return handler, nil
329 }
330
331 // processHTTPRequest is invoked on each HTTP POST.
332 //
333 // It deserializes the task and invokes an appropriate callback.
334 func (d *Dispatcher) processHTTPRequest(c *router.Context) {
335 body, err := ioutil.ReadAll(c.Request.Body)
336 if err != nil {
337 httpReply(c, false, 500, "Failed to read request body: %s", err)
338 return
339 }
340 logging.Debugf(c.Context, "Received task: %s", body)
341
342 payload, err := deserializePayload(body)
343 if err != nil {
344 httpReply(c, false, 400, "Bad payload, can't deserialize: %s", e rr)
345 return
346 }
347
348 h, err := d.handler(payload)
349 if err != nil {
350 httpReply(c, false, 404, "Bad task: %s", err)
351 return
352 }
353
354 execCount, _ := strconv.Atoi(c.Request.Header.Get("X-AppEngine-TaskExecu tionCount"))
355 switch err = h.cb(c.Context, payload, execCount); {
356 case err == nil:
357 httpReply(c, true, 200, "OK")
358 case transient.Tag.In(err):
359 httpReply(c, false, 500, "Transient error: %s", err)
360 default:
361 httpReply(c, false, 200, "Fatal error: %s", err) // return 200 t o stop retries
tandrii(chromium) 2017/07/17 09:54:30 so, are you saying that all our transient errors a
Vadim Sh. 2017/07/23 19:58:02 They should be when using this package. It's part
tandrii(chromium) 2017/07/26 09:51:34 Acknowledged.
362 }
363 }
364
365 func httpReply(c *router.Context, ok bool, code int, msg string, args ...interfa ce{}) {
366 body := fmt.Sprintf(msg, args...)
367 if !ok {
368 logging.Errorf(c.Context, "%s", body)
369 }
370 http.Error(c.Writer, body, code)
371 }
372
373 ////////////////////////////////////////////////////////////////////////////////
374
375 var marshaller = jsonpb.Marshaler{}
376
377 type envelope struct {
378 Type string `json:"type"`
379 Body *json.RawMessage `json:"body"`
380 }
381
382 func serializePayload(task proto.Message) ([]byte, error) {
383 var buf bytes.Buffer
384 if err := marshaller.Marshal(&buf, task); err != nil {
385 return nil, err
386 }
387 raw := json.RawMessage(buf.Bytes())
388 return json.Marshal(envelope{
389 Type: proto.MessageName(task),
390 Body: &raw,
391 })
392 }
393
394 func deserializePayload(blob []byte) (proto.Message, error) {
395 env := envelope{}
396 if err := json.Unmarshal(blob, &env); err != nil {
397 return nil, err
398 }
399
400 tp := proto.MessageType(env.Type) // this is **ConcreteStruct{}
401 if tp == nil {
402 return nil, fmt.Errorf("unregistered proto message name %q", env .Type)
403 }
404 if env.Body == nil {
405 return nil, fmt.Errorf("no task body given")
406 }
407
408 task := reflect.New(tp.Elem()).Interface().(proto.Message)
409 if err := jsonpb.Unmarshal(bytes.NewReader(*env.Body), task); err != nil {
410 return nil, err
411 }
412
413 return task, nil
414 }
OLDNEW
« no previous file with comments | « scheduler/appengine/engine/internal/tq_tasks.pb.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698