OLD | NEW |
(Empty) | |
| 1 // Copyright 2017 The LUCI Authors. |
| 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at |
| 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 // See the License for the specific language governing permissions and |
| 13 // limitations under the License. |
| 14 |
| 15 // Package tq implements simple routing layer for task queue tasks. |
| 16 package tq |
| 17 |
| 18 import ( |
| 19 "bytes" |
| 20 "crypto/sha256" |
| 21 "encoding/hex" |
| 22 "encoding/json" |
| 23 "fmt" |
| 24 "io/ioutil" |
| 25 "net/http" |
| 26 "reflect" |
| 27 "strconv" |
| 28 "strings" |
| 29 "sync" |
| 30 "time" |
| 31 |
| 32 "github.com/golang/protobuf/jsonpb" |
| 33 "github.com/golang/protobuf/proto" |
| 34 "golang.org/x/net/context" |
| 35 |
| 36 "github.com/luci/gae/service/taskqueue" |
| 37 |
| 38 "github.com/luci/luci-go/appengine/gaemiddleware" |
| 39 "github.com/luci/luci-go/common/data/stringset" |
| 40 "github.com/luci/luci-go/common/errors" |
| 41 "github.com/luci/luci-go/common/logging" |
| 42 "github.com/luci/luci-go/common/retry/transient" |
| 43 "github.com/luci/luci-go/server/router" |
| 44 ) |
| 45 |
| 46 // Dispatcher submits and handles task queue tasks. |
| 47 type Dispatcher struct { |
| 48 BaseURL string // URL prefix for all URLs, "/internal/tasks/" by default |
| 49 |
| 50 mu sync.RWMutex |
| 51 handlers map[string]handler // the key is proto message type name |
| 52 } |
| 53 |
| 54 // Task contains task body and additional parameters that influence how it is |
| 55 // routed. |
| 56 type Task struct { |
| 57 // Payload is task's payload as well as indicator of its type. |
| 58 // |
| 59 // Tasks are routed based on type of the payload message, see RegisterTa
sk. |
| 60 Payload proto.Message |
| 61 |
| 62 // DeduplicationKey is optional unique key of the task. |
| 63 // |
| 64 // If a task of a given proto type with a given key has already been enq
ueued |
| 65 // recently, this task will be silently ignored. |
| 66 // |
| 67 // Such tasks can only be used outside of transactions. |
| 68 DeduplicationKey string |
| 69 |
| 70 // Title is optional string that identifies the task in HTTP logs. |
| 71 // |
| 72 // It will show up as a suffix in task handler URL. It exists exclusivel
y to |
| 73 // simplify reading HTTP logs. It serves no other purpose! In particular
, |
| 74 // it is NOT a task name. |
| 75 // |
| 76 // Handlers won't ever see it. Pass all information through the task bod
y. |
| 77 Title string |
| 78 |
| 79 // Delay specifies the duration the task queue service must wait before |
| 80 // executing the task. |
| 81 // |
| 82 // Either Delay or ETA may be set, but not both. |
| 83 Delay time.Duration |
| 84 |
| 85 // ETA specifies the earliest time a task may be executed. |
| 86 // |
| 87 // Either Delay or ETA may be set, but not both. |
| 88 ETA time.Time |
| 89 |
| 90 // Retry options for this task. |
| 91 // |
| 92 // If given, overrides default options set when this task was registered
. |
| 93 RetryOptions *taskqueue.RetryOptions |
| 94 } |
| 95 |
| 96 // Handler is called to handle one enqueued task. |
| 97 // |
| 98 // The passed context is produced by a middleware chain installed with |
| 99 // InstallHandlers. |
| 100 // |
| 101 // execCount corresponds to X-AppEngine-TaskExecutionCount header value: it is |
| 102 // 1 on first execution attempt, 2 on a retry, and so on. |
| 103 // |
| 104 // May return transient errors. In this case, task queue may attempt to |
| 105 // redeliver the task (depending on RetryOptions). |
| 106 // |
| 107 // A fatal error (or success) mark the task as "done", it won't be retried. |
| 108 type Handler func(c context.Context, payload proto.Message, execCount int) error |
| 109 |
| 110 // RegisterTask tells the dispatcher that tasks of given proto type should be |
| 111 // handled by the given handler and routed through the given task queue. |
| 112 // |
| 113 // 'prototype' should be a pointer to some concrete proto message. It will be |
| 114 // used only for its type signature. |
| 115 // |
| 116 // Intended to be called during process startup. Panics if such message has |
| 117 // already been registered. |
| 118 func (d *Dispatcher) RegisterTask(prototype proto.Message, cb Handler, queue str
ing, opts *taskqueue.RetryOptions) { |
| 119 if queue == "" { |
| 120 queue = "default" // default GAE task queue name, always exists |
| 121 } |
| 122 |
| 123 name := proto.MessageName(prototype) |
| 124 if name == "" { |
| 125 panic(fmt.Sprintf("unregistered proto message type %T", prototyp
e)) |
| 126 } |
| 127 |
| 128 d.mu.Lock() |
| 129 defer d.mu.Unlock() |
| 130 |
| 131 if _, ok := d.handlers[name]; ok { |
| 132 panic(fmt.Sprintf("handler for %q has already been registered",
name)) |
| 133 } |
| 134 |
| 135 if d.handlers == nil { |
| 136 d.handlers = make(map[string]handler) |
| 137 } |
| 138 |
| 139 d.handlers[name] = handler{ |
| 140 cb: cb, |
| 141 typeName: name, |
| 142 queue: queue, |
| 143 retryOpts: opts, |
| 144 } |
| 145 } |
| 146 |
| 147 // AddTask submits given tasks to an appropriate task queue. |
| 148 // |
| 149 // It means, at some later time in some other GAE process, callbacks registered |
| 150 // as handlers for corresponding proto types will be called. |
| 151 // |
| 152 // If the given context is transactional, inherits the transaction. Note if |
| 153 // running outside of a transaction and multiple tasks are passed, the operation |
| 154 // is not atomic: it returns an error if at least one enqueue operation failed |
| 155 // (there's no way to figure out which one exactly). |
| 156 // |
| 157 // Returns only transient errors. Unlike regular Task Queue's Add, |
| 158 // ErrTaskAlreadyAdded is not considered an error. |
| 159 func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error { |
| 160 if len(tasks) == 0 { |
| 161 return nil |
| 162 } |
| 163 |
| 164 // Handle the most common case of one task in a more efficient way. |
| 165 if len(tasks) == 1 { |
| 166 t, queue, err := d.tqTask(tasks[0]) |
| 167 if err != nil { |
| 168 return err |
| 169 } |
| 170 if err := taskqueue.Add(c, queue, t); err != nil { |
| 171 if err == taskqueue.ErrTaskAlreadyAdded { |
| 172 return nil |
| 173 } |
| 174 return transient.Tag.Apply(err) |
| 175 } |
| 176 return nil |
| 177 } |
| 178 |
| 179 perQueue := map[string][]*taskqueue.Task{} |
| 180 for _, task := range tasks { |
| 181 t, queue, err := d.tqTask(task) |
| 182 if err != nil { |
| 183 return err |
| 184 } |
| 185 perQueue[queue] = append(perQueue[queue], t) |
| 186 } |
| 187 |
| 188 // Enqueue in parallel, per-queue, split into batches based on Task Queu
e |
| 189 // RPC limits (100 tasks per batch). |
| 190 errs := make(chan error) |
| 191 ops := 0 |
| 192 for q, tasks := range perQueue { |
| 193 for len(tasks) > 0 { |
| 194 count := 100 |
| 195 if count > len(tasks) { |
| 196 count = len(tasks) |
| 197 } |
| 198 go func(q string, batch []*taskqueue.Task) { |
| 199 errs <- taskqueue.Add(c, q, batch...) |
| 200 }(q, tasks[:count]) |
| 201 tasks = tasks[count:] |
| 202 ops++ |
| 203 } |
| 204 } |
| 205 |
| 206 // Gather all errors throwing away ErrTaskAlreadyAdded. |
| 207 var all errors.MultiError |
| 208 for i := 0; i < ops; i++ { |
| 209 err := <-errs |
| 210 if merr, yep := err.(errors.MultiError); yep { |
| 211 for _, e := range merr { |
| 212 if e != nil && e != taskqueue.ErrTaskAlreadyAdde
d { |
| 213 all = append(all, e) |
| 214 } |
| 215 } |
| 216 } else if err != nil && err != taskqueue.ErrTaskAlreadyAdded { |
| 217 all = append(all, err) |
| 218 } |
| 219 } |
| 220 |
| 221 if len(all) == 0 { |
| 222 return nil |
| 223 } |
| 224 |
| 225 return transient.Tag.Apply(all) |
| 226 } |
| 227 |
| 228 // InstallRoutes installs appropriate HTTP routes in the router. |
| 229 // |
| 230 // Must be called only after all task handlers are registered! |
| 231 func (d *Dispatcher) InstallRoutes(r *router.Router, mw router.MiddlewareChain)
{ |
| 232 queues := stringset.New(0) |
| 233 |
| 234 d.mu.RLock() |
| 235 for _, h := range d.handlers { |
| 236 queues.Add(h.queue) |
| 237 } |
| 238 d.mu.RUnlock() |
| 239 |
| 240 for _, q := range queues.ToSlice() { |
| 241 r.POST( |
| 242 fmt.Sprintf("%s%s/*title", d.baseURL(), q), |
| 243 mw.Extend(gaemiddleware.RequireTaskQueue(q)), |
| 244 d.processHTTPRequest) |
| 245 } |
| 246 } |
| 247 |
| 248 //////////////////////////////////////////////////////////////////////////////// |
| 249 |
| 250 type handler struct { |
| 251 cb Handler // the actual handler |
| 252 typeName string // name of the proto type it handles |
| 253 queue string // name of the task queue |
| 254 retryOpts *taskqueue.RetryOptions // default retry options |
| 255 } |
| 256 |
| 257 // tqTask constructs task queue task struct. |
| 258 func (d *Dispatcher) tqTask(task *Task) (*taskqueue.Task, string, error) { |
| 259 handler, err := d.handler(task.Payload) |
| 260 if err != nil { |
| 261 return nil, "", err |
| 262 } |
| 263 |
| 264 blob, err := serializePayload(task.Payload) |
| 265 if err != nil { |
| 266 return nil, "", err |
| 267 } |
| 268 |
| 269 title := handler.typeName |
| 270 if task.Title != "" { |
| 271 title = task.Title |
| 272 } |
| 273 |
| 274 retryOpts := handler.retryOpts |
| 275 if task.RetryOptions != nil { |
| 276 retryOpts = task.RetryOptions |
| 277 } |
| 278 |
| 279 // There's some weird restrictions on what characters are allowed inside
task |
| 280 // names. Lexicographically close names also cause hot spot problems in
the |
| 281 // Task Queues backend. To avoid these two issues, we always use SHA256
hashes |
| 282 // as task names. Also each task kind owns its own namespace of deduplic
ation |
| 283 // keys, so add task type to the digest as well. |
| 284 name := "" |
| 285 if task.DeduplicationKey != "" { |
| 286 h := sha256.New() |
| 287 h.Write([]byte(handler.typeName)) |
| 288 h.Write([]byte{0}) |
| 289 h.Write([]byte(task.DeduplicationKey)) |
| 290 name = hex.EncodeToString(h.Sum(nil)) |
| 291 } |
| 292 |
| 293 return &taskqueue.Task{ |
| 294 Path: fmt.Sprintf("%s%s/%s", d.baseURL(), handler.queue,
title), |
| 295 Name: name, |
| 296 Method: "POST", |
| 297 Payload: blob, |
| 298 ETA: task.ETA, |
| 299 Delay: task.Delay, |
| 300 RetryOptions: retryOpts, |
| 301 }, handler.queue, nil |
| 302 } |
| 303 |
| 304 // baseURL returns a URL prefix for all HTTP routes used by Dispatcher. |
| 305 // |
| 306 // It ends with '/'. |
| 307 func (d *Dispatcher) baseURL() string { |
| 308 switch { |
| 309 case d.BaseURL != "" && strings.HasSuffix(d.BaseURL, "/"): |
| 310 return d.BaseURL |
| 311 case d.BaseURL != "": |
| 312 return d.BaseURL + "/" |
| 313 default: |
| 314 return "/internal/tasks/" |
| 315 } |
| 316 } |
| 317 |
| 318 // handler returns a handler struct registered with Register. |
| 319 func (d *Dispatcher) handler(payload proto.Message) (handler, error) { |
| 320 name := proto.MessageName(payload) |
| 321 |
| 322 d.mu.RLock() |
| 323 defer d.mu.RUnlock() |
| 324 |
| 325 handler, registered := d.handlers[name] |
| 326 if !registered { |
| 327 return handler, fmt.Errorf("handler for %q is not registered", n
ame) |
| 328 } |
| 329 return handler, nil |
| 330 } |
| 331 |
| 332 // processHTTPRequest is invoked on each HTTP POST. |
| 333 // |
| 334 // It deserializes the task and invokes an appropriate callback. Finishes the |
| 335 // request with status 202 in case of a fatal error (to stop retries). |
| 336 func (d *Dispatcher) processHTTPRequest(c *router.Context) { |
| 337 body, err := ioutil.ReadAll(c.Request.Body) |
| 338 if err != nil { |
| 339 httpReply(c, false, 500, "Failed to read request body: %s", err) |
| 340 return |
| 341 } |
| 342 logging.Debugf(c.Context, "Received task: %s", body) |
| 343 |
| 344 payload, err := deserializePayload(body) |
| 345 if err != nil { |
| 346 httpReply(c, false, 202, "Bad payload, can't deserialize: %s", e
rr) |
| 347 return |
| 348 } |
| 349 |
| 350 h, err := d.handler(payload) |
| 351 if err != nil { |
| 352 httpReply(c, false, 202, "Bad task: %s", err) |
| 353 return |
| 354 } |
| 355 |
| 356 execCount, _ := strconv.Atoi(c.Request.Header.Get("X-AppEngine-TaskExecu
tionCount")) |
| 357 switch err = h.cb(c.Context, payload, execCount); { |
| 358 case err == nil: |
| 359 httpReply(c, true, 200, "OK") |
| 360 case transient.Tag.In(err): |
| 361 httpReply(c, false, 500, "Transient error: %s", err) |
| 362 default: |
| 363 httpReply(c, false, 202, "Fatal error: %s", err) |
| 364 } |
| 365 } |
| 366 |
| 367 func httpReply(c *router.Context, ok bool, code int, msg string, args ...interfa
ce{}) { |
| 368 body := fmt.Sprintf(msg, args...) |
| 369 if !ok { |
| 370 logging.Errorf(c.Context, "%s", body) |
| 371 } |
| 372 http.Error(c.Writer, body, code) |
| 373 } |
| 374 |
| 375 //////////////////////////////////////////////////////////////////////////////// |
| 376 |
| 377 var marshaller = jsonpb.Marshaler{} |
| 378 |
| 379 type envelope struct { |
| 380 Type string `json:"type"` |
| 381 Body *json.RawMessage `json:"body"` |
| 382 } |
| 383 |
| 384 func serializePayload(task proto.Message) ([]byte, error) { |
| 385 var buf bytes.Buffer |
| 386 if err := marshaller.Marshal(&buf, task); err != nil { |
| 387 return nil, err |
| 388 } |
| 389 raw := json.RawMessage(buf.Bytes()) |
| 390 return json.Marshal(envelope{ |
| 391 Type: proto.MessageName(task), |
| 392 Body: &raw, |
| 393 }) |
| 394 } |
| 395 |
| 396 func deserializePayload(blob []byte) (proto.Message, error) { |
| 397 env := envelope{} |
| 398 if err := json.Unmarshal(blob, &env); err != nil { |
| 399 return nil, err |
| 400 } |
| 401 |
| 402 tp := proto.MessageType(env.Type) // this is **ConcreteStruct{} |
| 403 if tp == nil { |
| 404 return nil, fmt.Errorf("unregistered proto message name %q", env
.Type) |
| 405 } |
| 406 if env.Body == nil { |
| 407 return nil, fmt.Errorf("no task body given") |
| 408 } |
| 409 |
| 410 task := reflect.New(tp.Elem()).Interface().(proto.Message) |
| 411 if err := jsonpb.Unmarshal(bytes.NewReader(*env.Body), task); err != nil
{ |
| 412 return nil, err |
| 413 } |
| 414 |
| 415 return task, nil |
| 416 } |
OLD | NEW |