OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2017 The LUCI Authors. | |
2 // | |
3 // Licensed under the Apache License, Version 2.0 (the "License"); | |
4 // you may not use this file except in compliance with the License. | |
5 // You may obtain a copy of the License at | |
6 // | |
7 // http://www.apache.org/licenses/LICENSE-2.0 | |
8 // | |
9 // Unless required by applicable law or agreed to in writing, software | |
10 // distributed under the License is distributed on an "AS IS" BASIS, | |
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 // See the License for the specific language governing permissions and | |
13 // limitations under the License. | |
14 | |
15 // Package tq implements simple routing layer for task queue tasks. | |
16 package tq | |
17 | |
18 import ( | |
19 "bytes" | |
20 "crypto/sha256" | |
21 "encoding/hex" | |
22 "encoding/json" | |
23 "fmt" | |
24 "io/ioutil" | |
25 "net/http" | |
26 "reflect" | |
27 "strconv" | |
28 "strings" | |
29 "sync" | |
30 "time" | |
31 | |
32 "github.com/golang/protobuf/jsonpb" | |
33 "github.com/golang/protobuf/proto" | |
34 "golang.org/x/net/context" | |
35 | |
36 "github.com/luci/gae/service/taskqueue" | |
37 | |
38 "github.com/luci/luci-go/appengine/gaemiddleware" | |
39 "github.com/luci/luci-go/common/data/stringset" | |
40 "github.com/luci/luci-go/common/errors" | |
41 "github.com/luci/luci-go/common/logging" | |
42 "github.com/luci/luci-go/common/retry/transient" | |
43 "github.com/luci/luci-go/server/router" | |
44 ) | |
45 | |
46 // Dispatcher submits and handles task queue tasks. | |
47 type Dispatcher struct { | |
48 BaseURL string // URL prefix for all URLs, "/internal/tasks/" by default | |
49 | |
50 mu sync.RWMutex | |
51 handlers map[string]handler // the key is proto message type name | |
52 } | |
53 | |
54 // Task contains task body and additional parameters that influence how it is | |
55 // routed. | |
56 type Task struct { | |
57 // Payload is task's payload as well as indicator of its type. | |
58 // | |
59 // Tasks are routed based on type of the payload message, see RegisterTa sk. | |
60 Payload proto.Message | |
61 | |
62 // DeduplicationKey is optional unique key of the task. | |
63 // | |
64 // If a task of given proto type with given key has already been enqueue d | |
tandrii(chromium)
2017/07/28 09:47:17
of A given proto type
Vadim Sh.
2017/07/28 19:20:30
Done. I hate articles.
| |
65 // recently, this task will be silently ignored. | |
66 // | |
67 // Such tasks can only be used outside of transactions. | |
68 DeduplicationKey string | |
69 | |
70 // Title is optional string that identifies the task in HTTP logs. | |
71 // | |
72 // It will show up as a suffix in task handler URL. It exists exclusivel y to | |
73 // simplify reading HTTP logs. It serves no other purpose! In particular , | |
74 // it is NOT a task name. | |
75 // | |
76 // Handlers won't ever see it. Pass all information through the task bod y. | |
77 Title string | |
78 | |
79 // Delay specifies the duration the task queue service must wait before | |
80 // executing the task. | |
81 // | |
82 // Either Delay or ETA may be set, but not both. | |
83 Delay time.Duration | |
84 | |
85 // ETA specifies the earliest time a task may be executed. | |
86 // | |
87 // Either Delay or ETA may be set, but not both. | |
88 ETA time.Time | |
89 | |
90 // Retry options for this task. | |
91 // | |
92 // If given, overrides default options set when this task was registered . | |
93 RetryOptions *taskqueue.RetryOptions | |
94 } | |
95 | |
96 // Handler is called to handle one enqueued task. | |
97 // | |
98 // The passed context is produced by a middleware chain installed with | |
99 // InstallHandlers. | |
100 // | |
101 // execCount corresponds to X-AppEngine-TaskExecutionCount header value: it is | |
102 // 1 on first execution attempt, 2 on a retry, and so on. | |
103 // | |
104 // May return transient errors. In this case, task queue may attempt to | |
105 // redeliver the task (depending on RetryOptions). | |
106 // | |
107 // A fatal error (or success) mark the task as "done", it won't be retried. | |
108 type Handler func(c context.Context, payload proto.Message, execCount int) error | |
109 | |
110 // RegisterTask tells the dispatcher that tasks of given proto type should be | |
111 // handled by the given handler and routed through the given task queue. | |
112 // | |
113 // 'prototype' should be a pointer to some concrete proto message. It will be | |
114 // used only for its type signature. | |
115 // | |
116 // Intended to be called during process startup. Panics if such message has | |
117 // already been registered. | |
118 func (d *Dispatcher) RegisterTask(prototype proto.Message, cb Handler, queue str ing, opts *taskqueue.RetryOptions) { | |
119 if queue == "" { | |
120 queue = "default" // default GAE task queue name, always exists | |
121 } | |
122 | |
123 name := proto.MessageName(prototype) | |
124 if name == "" { | |
125 panic(fmt.Sprintf("unregistered proto message type %T", prototyp e)) | |
126 } | |
127 | |
128 d.mu.Lock() | |
129 defer d.mu.Unlock() | |
130 | |
131 if _, ok := d.handlers[name]; ok { | |
132 panic(fmt.Sprintf("handler for %q has already been registered", name)) | |
133 } | |
134 | |
135 if d.handlers == nil { | |
136 d.handlers = make(map[string]handler) | |
137 } | |
138 | |
139 d.handlers[name] = handler{ | |
140 cb: cb, | |
141 typeName: name, | |
142 queue: queue, | |
143 retryOpts: opts, | |
144 } | |
145 } | |
146 | |
147 // AddTask submits given tasks to an appropriate task queue. | |
148 // | |
149 // It means, at some later time in some other GAE process, callbacks registered | |
150 // as handlers for corresponding proto types will be called. | |
151 // | |
152 // If the given context is transactional, inherits the transaction. Note if | |
153 // running outside of a transaction and multiple tasks are passed, the operation | |
154 // is not atomic: it returns an error if at least one enqueue operation failed | |
155 // (there's no way to figure out which one exactly). | |
156 // | |
157 // Returns only transient errors. Unlike regular Task Queue's Add, | |
158 // ErrTaskAlreadyAdded is not considered an error. | |
159 func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error { | |
160 if len(tasks) == 0 { | |
161 return nil | |
162 } | |
163 | |
164 // Handle the most common case of one task in a more efficient way. | |
165 if len(tasks) == 1 { | |
166 t, queue, err := d.tqTask(tasks[0]) | |
167 if err != nil { | |
168 return err | |
169 } | |
170 if err := taskqueue.Add(c, queue, t); err != nil { | |
171 if err == taskqueue.ErrTaskAlreadyAdded { | |
172 return nil | |
173 } | |
174 return transient.Tag.Apply(err) | |
175 } | |
176 return nil | |
177 } | |
178 | |
179 perQueue := map[string][]*taskqueue.Task{} | |
180 for _, task := range tasks { | |
181 t, queue, err := d.tqTask(task) | |
182 if err != nil { | |
183 return err | |
184 } | |
185 perQueue[queue] = append(perQueue[queue], t) | |
186 } | |
187 | |
188 // Enqueue in parallel, per-queue, split into batches based on Task Queu e | |
189 // RPC limits (100 tasks per batch). | |
190 errs := make(chan error) | |
191 ops := 0 | |
192 for q, tasks := range perQueue { | |
193 for len(tasks) > 0 { | |
194 count := 100 | |
195 if count > len(tasks) { | |
196 count = len(tasks) | |
197 } | |
198 go func(q string, batch []*taskqueue.Task) { | |
199 errs <- taskqueue.Add(c, q, batch...) | |
200 }(q, tasks[:count]) | |
201 tasks = tasks[count:] | |
202 ops++ | |
203 } | |
204 } | |
205 | |
206 // Gather all errors throwing away ErrTaskAlreadyAdded. | |
207 var all errors.MultiError | |
208 for i := 0; i < ops; i++ { | |
209 err := <-errs | |
210 if merr, yep := err.(errors.MultiError); yep { | |
211 for _, e := range merr { | |
212 if e != nil && e != taskqueue.ErrTaskAlreadyAdde d { | |
213 all = append(all, e) | |
214 } | |
215 } | |
216 } else if err != nil && err != taskqueue.ErrTaskAlreadyAdded { | |
217 all = append(all, err) | |
218 } | |
219 } | |
220 | |
221 if len(all) == 0 { | |
222 return nil | |
223 } | |
224 | |
225 return transient.Tag.Apply(all) | |
226 } | |
227 | |
228 // InstallRoutes installs appropriate HTTP routes in the router. | |
229 // | |
230 // Must be called only after all task handlers are registered! | |
231 func (d *Dispatcher) InstallRoutes(r *router.Router, mw router.MiddlewareChain) { | |
232 queues := stringset.New(0) | |
233 | |
234 d.mu.RLock() | |
235 for _, h := range d.handlers { | |
236 queues.Add(h.queue) | |
237 } | |
238 d.mu.RUnlock() | |
239 | |
240 for _, q := range queues.ToSlice() { | |
241 r.POST( | |
242 fmt.Sprintf("%s%s/*title", d.baseURL(), q), | |
243 mw.Extend(gaemiddleware.RequireTaskQueue(q)), | |
244 d.processHTTPRequest) | |
245 } | |
246 } | |
247 | |
248 //////////////////////////////////////////////////////////////////////////////// | |
249 | |
250 type handler struct { | |
251 cb Handler // the actual handler | |
252 typeName string // name of the proto type it handles | |
253 queue string // name of the task queue | |
254 retryOpts *taskqueue.RetryOptions // default retry options | |
255 } | |
256 | |
257 // tqTask constructs task queue task struct. | |
258 func (d *Dispatcher) tqTask(task *Task) (*taskqueue.Task, string, error) { | |
259 handler, err := d.handler(task.Payload) | |
260 if err != nil { | |
261 return nil, "", err | |
262 } | |
263 | |
264 blob, err := serializePayload(task.Payload) | |
265 if err != nil { | |
266 return nil, "", err | |
267 } | |
268 | |
269 title := handler.typeName | |
270 if task.Title != "" { | |
271 title = task.Title | |
272 } | |
273 | |
274 retryOpts := handler.retryOpts | |
275 if task.RetryOptions != nil { | |
276 retryOpts = task.RetryOptions | |
277 } | |
278 | |
279 // There's some weird restrictions on what characters are allowed inside task | |
280 // names. Lexicographically close names also cause hot spot problems in the | |
281 // Task Queues backend. To avoid these two issues, we always use SHA256 hashes | |
282 // as task names. Also each task kind owns its own namespace of deduplic ation | |
283 // keys, so add task type to the digest as well. | |
284 name := "" | |
285 if task.DeduplicationKey != "" { | |
286 h := sha256.New() | |
287 h.Write([]byte(handler.typeName)) | |
288 h.Write([]byte{0}) | |
289 h.Write([]byte(task.DeduplicationKey)) | |
290 name = hex.EncodeToString(h.Sum(nil)) | |
291 } | |
292 | |
293 return &taskqueue.Task{ | |
294 Path: fmt.Sprintf("%s%s/%s", d.baseURL(), handler.queue, title), | |
295 Name: name, | |
296 Method: "POST", | |
297 Payload: blob, | |
298 ETA: task.ETA, | |
299 Delay: task.Delay, | |
300 RetryOptions: retryOpts, | |
301 }, handler.queue, nil | |
302 } | |
303 | |
304 // baseURL returns a URL prefix for all HTTP routes used by Dispatcher. | |
305 // | |
306 // It ends with '/'. | |
307 func (d *Dispatcher) baseURL() string { | |
308 switch { | |
309 case d.BaseURL != "" && strings.HasSuffix(d.BaseURL, "/"): | |
310 return d.BaseURL | |
311 case d.BaseURL != "": | |
312 return d.BaseURL + "/" | |
313 default: | |
314 return "/internal/tasks/" | |
315 } | |
316 } | |
317 | |
318 // handler returns a handler struct registered with Register. | |
319 func (d *Dispatcher) handler(payload proto.Message) (handler, error) { | |
320 name := proto.MessageName(payload) | |
321 | |
322 d.mu.RLock() | |
323 defer d.mu.RUnlock() | |
324 | |
325 handler, registered := d.handlers[name] | |
326 if !registered { | |
327 return handler, fmt.Errorf("handler for %q is not registered", n ame) | |
328 } | |
329 return handler, nil | |
330 } | |
331 | |
332 // processHTTPRequest is invoked on each HTTP POST. | |
333 // | |
334 // It deserializes the task and invokes an appropriate callback. Finishes the | |
335 // request with status 202 in case of a fatal error (to stop retries). | |
336 func (d *Dispatcher) processHTTPRequest(c *router.Context) { | |
337 body, err := ioutil.ReadAll(c.Request.Body) | |
338 if err != nil { | |
339 httpReply(c, false, 500, "Failed to read request body: %s", err) | |
340 return | |
341 } | |
342 logging.Debugf(c.Context, "Received task: %s", body) | |
343 | |
344 payload, err := deserializePayload(body) | |
345 if err != nil { | |
346 httpReply(c, false, 202, "Bad payload, can't deserialize: %s", e rr) | |
347 return | |
348 } | |
349 | |
350 h, err := d.handler(payload) | |
351 if err != nil { | |
352 httpReply(c, false, 202, "Bad task: %s", err) | |
353 return | |
354 } | |
355 | |
356 execCount, _ := strconv.Atoi(c.Request.Header.Get("X-AppEngine-TaskExecu tionCount")) | |
357 switch err = h.cb(c.Context, payload, execCount); { | |
358 case err == nil: | |
359 httpReply(c, true, 200, "OK") | |
360 case transient.Tag.In(err): | |
361 httpReply(c, false, 500, "Transient error: %s", err) | |
362 default: | |
363 httpReply(c, false, 202, "Fatal error: %s", err) | |
364 } | |
365 } | |
366 | |
367 func httpReply(c *router.Context, ok bool, code int, msg string, args ...interfa ce{}) { | |
368 body := fmt.Sprintf(msg, args...) | |
369 if !ok { | |
370 logging.Errorf(c.Context, "%s", body) | |
371 } | |
372 http.Error(c.Writer, body, code) | |
373 } | |
374 | |
375 //////////////////////////////////////////////////////////////////////////////// | |
376 | |
377 var marshaller = jsonpb.Marshaler{} | |
378 | |
379 type envelope struct { | |
380 Type string `json:"type"` | |
381 Body *json.RawMessage `json:"body"` | |
382 } | |
383 | |
384 func serializePayload(task proto.Message) ([]byte, error) { | |
385 var buf bytes.Buffer | |
386 if err := marshaller.Marshal(&buf, task); err != nil { | |
387 return nil, err | |
388 } | |
389 raw := json.RawMessage(buf.Bytes()) | |
390 return json.Marshal(envelope{ | |
391 Type: proto.MessageName(task), | |
392 Body: &raw, | |
393 }) | |
394 } | |
395 | |
396 func deserializePayload(blob []byte) (proto.Message, error) { | |
397 env := envelope{} | |
398 if err := json.Unmarshal(blob, &env); err != nil { | |
399 return nil, err | |
400 } | |
401 | |
402 tp := proto.MessageType(env.Type) // this is **ConcreteStruct{} | |
403 if tp == nil { | |
404 return nil, fmt.Errorf("unregistered proto message name %q", env .Type) | |
405 } | |
406 if env.Body == nil { | |
407 return nil, fmt.Errorf("no task body given") | |
408 } | |
409 | |
410 task := reflect.New(tp.Elem()).Interface().(proto.Message) | |
411 if err := jsonpb.Unmarshal(bytes.NewReader(*env.Body), task); err != nil { | |
412 return nil, err | |
413 } | |
414 | |
415 return task, nil | |
416 } | |
OLD | NEW |