| OLD | NEW |
| 1 // Copyright 2017 The LUCI Authors. | 1 // Copyright 2017 The LUCI Authors. |
| 2 // | 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
| 6 // | 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // | 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 52 } | 52 } |
| 53 | 53 |
| 54 // Task contains task body and additional parameters that influence how it is | 54 // Task contains task body and additional parameters that influence how it is |
| 55 // routed. | 55 // routed. |
| 56 type Task struct { | 56 type Task struct { |
| 57 // Payload is task's payload as well as indicator of its type. | 57 // Payload is task's payload as well as indicator of its type. |
| 58 // | 58 // |
| 59 // Tasks are routed based on type of the payload message, see RegisterTa
sk. | 59 // Tasks are routed based on type of the payload message, see RegisterTa
sk. |
| 60 Payload proto.Message | 60 Payload proto.Message |
| 61 | 61 |
| 62 // NamePrefix, if not empty, is a string that will be prefixed to the ta
sk's |
| 63 // name. Characters in NamePrefix must be appropriate task queue name |
| 64 // characters. NamePrefix can be useful because the Task Queue system al
lows |
| 65 // users to search for tasks by prefix. |
| 66 // |
| 67 // Lexicographically close names can cause hot spots in the Task Queues |
| 68 // backend. If NamePrefix is specified, users should try and ensure that |
| 69 // it is friendly to sharding (e.g., begins with a hash string). |
| 70 // |
| 71 // Setting NamePrefix and/or DeduplicationKey will result in a named tas
k |
| 72 // being generated. This task can be cancelled using DeleteTask. |
| 73 NamePrefix string |
| 74 |
| 62 // DeduplicationKey is optional unique key of the task. | 75 // DeduplicationKey is optional unique key of the task. |
| 63 // | 76 // |
| 64 // If a task of a given proto type with a given key has already been enq
ueued | 77 // If a task of a given proto type with a given key has already been enq
ueued |
| 65 // recently, this task will be silently ignored. | 78 // recently, this task will be silently ignored. |
| 66 // | 79 // |
| 67 // Such tasks can only be used outside of transactions. | 80 // Such tasks can only be used outside of transactions. |
| 81 // |
| 82 // Setting NamePrefix and/or DeduplicationKey will result in a named tas
k |
| 83 // being generated. This task can be cancelled using DeleteTask. |
| 68 DeduplicationKey string | 84 DeduplicationKey string |
| 69 | 85 |
| 70 // Title is optional string that identifies the task in HTTP logs. | 86 // Title is optional string that identifies the task in HTTP logs. |
| 71 // | 87 // |
| 72 // It will show up as a suffix in task handler URL. It exists exclusivel
y to | 88 // It will show up as a suffix in task handler URL. It exists exclusivel
y to |
| 73 // simplify reading HTTP logs. It serves no other purpose! In particular
, | 89 // simplify reading HTTP logs. It serves no other purpose! In particular
, |
| 74 // it is NOT a task name. | 90 // it is NOT a task name. |
| 75 // | 91 // |
| 76 // Handlers won't ever see it. Pass all information through the task bod
y. | 92 // Handlers won't ever see it. Pass all information through the task bod
y. |
| 77 Title string | 93 Title string |
| 78 | 94 |
| 79 // Delay specifies the duration the task queue service must wait before | 95 // Delay specifies the duration the task queue service must wait before |
| 80 // executing the task. | 96 // executing the task. |
| 81 // | 97 // |
| 82 // Either Delay or ETA may be set, but not both. | 98 // Either Delay or ETA may be set, but not both. |
| 83 Delay time.Duration | 99 Delay time.Duration |
| 84 | 100 |
| 85 // ETA specifies the earliest time a task may be executed. | 101 // ETA specifies the earliest time a task may be executed. |
| 86 // | 102 // |
| 87 // Either Delay or ETA may be set, but not both. | 103 // Either Delay or ETA may be set, but not both. |
| 88 ETA time.Time | 104 ETA time.Time |
| 89 | 105 |
| 90 // Retry options for this task. | 106 // Retry options for this task. |
| 91 // | 107 // |
| 92 // If given, overrides default options set when this task was registered
. | 108 // If given, overrides default options set when this task was registered
. |
| 93 RetryOptions *taskqueue.RetryOptions | 109 RetryOptions *taskqueue.RetryOptions |
| 94 } | 110 } |
| 95 | 111 |
| 112 // Name generates and returns the task's name. |
| 113 // |
| 114 // If the task is not a named task (doesn't have NamePrefix or DeduplicationKey |
| 115 // set), this will return an empty string. |
| 116 func (task *Task) Name() string { |
| 117 if task.NamePrefix == "" && task.DeduplicationKey == "" { |
| 118 return "" |
| 119 } |
| 120 |
| 121 parts := make([]string, 0, 2) |
| 122 |
| 123 if task.NamePrefix != "" { |
| 124 parts = append(parts, task.NamePrefix) |
| 125 } |
| 126 |
| 127 // There's some weird restrictions on what characters are allowed inside
task |
| 128 // names. Lexicographically close names also cause hot spot problems in
the |
| 129 // Task Queues backend. To avoid these two issues, we always use SHA256
hashes |
| 130 // as task names. Also each task kind owns its own namespace of deduplic
ation |
| 131 // keys, so add task type to the digest as well. |
| 132 if task.DeduplicationKey != "" { |
| 133 h := sha256.New() |
| 134 if task.Payload == nil { |
| 135 panic("task must have a Payload") |
| 136 } |
| 137 h.Write([]byte(proto.MessageName(task.Payload))) |
| 138 h.Write([]byte{0}) |
| 139 h.Write([]byte(task.DeduplicationKey)) |
| 140 parts = append(parts, hex.EncodeToString(h.Sum(nil))) |
| 141 } |
| 142 |
| 143 return strings.Join(parts, "-") |
| 144 } |
| 145 |
| 96 // Handler is called to handle one enqueued task. | 146 // Handler is called to handle one enqueued task. |
| 97 // | 147 // |
| 98 // The passed context is produced by a middleware chain installed with | 148 // The passed context is produced by a middleware chain installed with |
| 99 // InstallHandlers. | 149 // InstallHandlers. |
| 100 // | 150 // |
| 101 // execCount corresponds to X-AppEngine-TaskExecutionCount header value: it is | 151 // execCount corresponds to X-AppEngine-TaskExecutionCount header value: it is |
| 102 // 1 on first execution attempt, 2 on a retry, and so on. | 152 // 1 on first execution attempt, 2 on a retry, and so on. |
| 103 // | 153 // |
| 104 // May return transient errors. In this case, task queue may attempt to | 154 // May return transient errors. In this case, task queue may attempt to |
| 105 // redeliver the task (depending on RetryOptions). | 155 // redeliver the task (depending on RetryOptions). |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 137 } | 187 } |
| 138 | 188 |
| 139 d.handlers[name] = handler{ | 189 d.handlers[name] = handler{ |
| 140 cb: cb, | 190 cb: cb, |
| 141 typeName: name, | 191 typeName: name, |
| 142 queue: queue, | 192 queue: queue, |
| 143 retryOpts: opts, | 193 retryOpts: opts, |
| 144 } | 194 } |
| 145 } | 195 } |
| 146 | 196 |
| 147 // AddTask submits given tasks to an appropriate task queue. | 197 // runBatchesPerQueue is a generic parallel task distributor. It solves the |
| 198 // problems that: |
| 199 //» - "tasks" may be assigned to different queues, and tasks assigned to the |
| 200 // same queue should be batched together. |
| 201 //» - Any given batch may exceed queue operation limits, and thus needs to b
e |
| 202 //» broken into multiple operations on sub-batches. |
| 148 // | 203 // |
| 149 // It means, at some later time in some other GAE process, callbacks registered | 204 // fn is called for each sub-batch assigned to each queue. All resulting errors |
| 150 // as handlers for corresponding proto types will be called. | 205 // are then flattened. If no fn invocation returns any errors, nil will be |
| 151 // | 206 // returned. If a single error is returned, this function will return that |
| 152 // If the given context is transactional, inherits the transaction. Note if | 207 // error. If an errors.MultiError is returned, it and any embedded |
| 153 // running outside of a transaction and multiple tasks are passed, the operation | 208 // MultiError (recursively) will be flattened into a single MultiError |
| 154 // is not atomic: it returns an error if at least one enqueue operation failed | 209 // containing only the non-nil errors. This simplifies user expectations. |
| 155 // (there's no way to figure out which one exactly). | 210 func (d *Dispatcher) runBatchesPerQueue(c context.Context, tasks []*Task, |
| 156 // | 211 » fn func(c context.Context, queue string, tasks []*taskqueue.Task) error)
error { |
| 157 // Returns only transient errors. Unlike regular Task Queue's Add, | 212 |
| 158 // ErrTaskAlreadyAdded is not considered an error. | |
| 159 func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error { | |
| 160 if len(tasks) == 0 { | 213 if len(tasks) == 0 { |
| 161 return nil | 214 return nil |
| 162 } | 215 } |
| 163 | 216 |
| 164 // Handle the most common case of one task in a more efficient way. | 217 // Handle the most common case of one task in a more efficient way. |
| 165 if len(tasks) == 1 { | 218 if len(tasks) == 1 { |
| 166 t, queue, err := d.tqTask(tasks[0]) | 219 t, queue, err := d.tqTask(tasks[0]) |
| 167 if err != nil { | 220 if err != nil { |
| 168 return err | 221 return err |
| 169 } | 222 } |
| 170 » » if err := taskqueue.Add(c, queue, t); err != nil { | 223 » » if err := fn(c, queue, []*taskqueue.Task{t}); err != nil { |
| 171 » » » if err == taskqueue.ErrTaskAlreadyAdded { | |
| 172 » » » » return nil | |
| 173 » » » } | |
| 174 return transient.Tag.Apply(err) | 224 return transient.Tag.Apply(err) |
| 175 } | 225 } |
| 176 return nil | 226 return nil |
| 177 } | 227 } |
| 178 | 228 |
| 179 perQueue := map[string][]*taskqueue.Task{} | 229 perQueue := map[string][]*taskqueue.Task{} |
| 180 for _, task := range tasks { | 230 for _, task := range tasks { |
| 181 t, queue, err := d.tqTask(task) | 231 t, queue, err := d.tqTask(task) |
| 182 if err != nil { | 232 if err != nil { |
| 183 return err | 233 return err |
| 184 } | 234 } |
| 185 perQueue[queue] = append(perQueue[queue], t) | 235 perQueue[queue] = append(perQueue[queue], t) |
| 186 } | 236 } |
| 187 | 237 |
| 188 // Enqueue in parallel, per-queue, split into batches based on Task Queu
e | 238 // Enqueue in parallel, per-queue, split into batches based on Task Queu
e |
| 189 // RPC limits (100 tasks per batch). | 239 // RPC limits (100 tasks per batch). |
| 240 const maxBatchSize = 100 |
| 190 errs := make(chan error) | 241 errs := make(chan error) |
| 191 ops := 0 | 242 ops := 0 |
| 192 for q, tasks := range perQueue { | 243 for q, tasks := range perQueue { |
| 193 for len(tasks) > 0 { | 244 for len(tasks) > 0 { |
| 194 » » » count := 100 | 245 » » » count := maxBatchSize |
| 195 if count > len(tasks) { | 246 if count > len(tasks) { |
| 196 count = len(tasks) | 247 count = len(tasks) |
| 197 } | 248 } |
| 198 go func(q string, batch []*taskqueue.Task) { | 249 go func(q string, batch []*taskqueue.Task) { |
| 199 » » » » errs <- taskqueue.Add(c, q, batch...) | 250 » » » » errs <- fn(c, q, batch) |
| 200 }(q, tasks[:count]) | 251 }(q, tasks[:count]) |
| 201 tasks = tasks[count:] | 252 tasks = tasks[count:] |
| 202 ops++ | 253 ops++ |
| 203 } | 254 } |
| 204 } | 255 } |
| 205 | 256 |
| 206 » // Gather all errors throwing away ErrTaskAlreadyAdded. | 257 » all := errors.NewLazyMultiError(ops) |
| 207 » var all errors.MultiError | |
| 208 for i := 0; i < ops; i++ { | 258 for i := 0; i < ops; i++ { |
| 209 err := <-errs | 259 err := <-errs |
| 210 » » if merr, yep := err.(errors.MultiError); yep { | 260 » » if err != nil { |
| 211 » » » for _, e := range merr { | 261 » » » all.Assign(i, err) |
| 212 » » » » if e != nil && e != taskqueue.ErrTaskAlreadyAdde
d { | |
| 213 » » » » » all = append(all, e) | |
| 214 » » » » } | |
| 215 » » » } | |
| 216 » » } else if err != nil && err != taskqueue.ErrTaskAlreadyAdded { | |
| 217 » » » all = append(all, err) | |
| 218 } | 262 } |
| 219 } | 263 } |
| 220 | 264 |
| 221 » if len(all) == 0 { | 265 » if err := flattenErrors(all.Get()); err != nil { |
| 266 » » return transient.Tag.Apply(err) |
| 267 » } |
| 268 » return nil |
| 269 } |
| 270 |
| 271 // AddTask submits given tasks to an appropriate task queue. |
| 272 // |
| 273 // It means, at some later time in some other GAE process, callbacks registered |
| 274 // as handlers for corresponding proto types will be called. |
| 275 // |
| 276 // If the given context is transactional or namespaced, inherits the |
| 277 // transaction/namespace. Note if running outside of a transaction and multiple |
| 278 // tasks are passed, the operation is not atomic: it returns an error if at |
| 279 // least one enqueue operation failed (there's no way to figure out which one |
| 280 // exactly). |
| 281 // |
| 282 // Returns only transient errors. Unlike regular Task Queue's Add, |
| 283 // ErrTaskAlreadyAdded is not considered an error. |
| 284 func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error { |
| 285 » return d.runBatchesPerQueue(c, tasks, func(c context.Context, queue stri
ng, tasks []*taskqueue.Task) error { |
| 286 » » if err := taskqueue.Add(c, queue, tasks...); err != nil { |
| 287 » » » return errors.Filter(err, taskqueue.ErrTaskAlreadyAdded) |
| 288 » » } |
| 222 return nil | 289 return nil |
| 223 » } | 290 » }) |
| 291 } |
| 224 | 292 |
| 225 » return transient.Tag.Apply(all) | 293 // DeleteTask deletes the specified tasks from their queues. |
| 294 // |
| 295 // If the given context is transactional or namespaced, inherits the |
| 296 // transaction/namespace. Note if running outside of a transaction and multiple |
| 297 // tasks are passed, the operation is not atomic: it returns an error if at |
| 298 // least one enqueue operation failed (there's no way to figure out which one |
| 299 // exactly). |
| 300 // |
| 301 // Returns only transient errors. Unlike regular Task Queue's Delete, |
| 302 // attempts to delete an unknown or tombstoned task are not considered errors. |
| 303 func (d *Dispatcher) DeleteTask(c context.Context, tasks ...*Task) error { |
| 304 » return d.runBatchesPerQueue(c, tasks, func(c context.Context, queue stri
ng, tasks []*taskqueue.Task) error { |
| 305 » » return errors.FilterFunc(taskqueue.Delete(c, queue, tasks...), f
unc(err error) bool { |
| 306 » » » // Currently, the best way to detect an attempt to delet
e an unknown task |
| 307 » » » // is to check the string with tolerable error message p
hrases. |
| 308 » » » for _, phrase := range []string{"UNKNOWN_TASK", "TOMBSTO
NED_TASK"} { |
| 309 » » » » if strings.Contains(err.Error(), phrase) { |
| 310 » » » » » return true |
| 311 » » » » } |
| 312 » » » } |
| 313 » » » return false |
| 314 » » }) |
| 315 » }) |
| 226 } | 316 } |
| 227 | 317 |
| 228 // InstallRoutes installs appropriate HTTP routes in the router. | 318 // InstallRoutes installs appropriate HTTP routes in the router. |
| 229 // | 319 // |
| 230 // Must be called only after all task handlers are registered! | 320 // Must be called only after all task handlers are registered! |
| 231 func (d *Dispatcher) InstallRoutes(r *router.Router, mw router.MiddlewareChain)
{ | 321 func (d *Dispatcher) InstallRoutes(r *router.Router, mw router.MiddlewareChain)
{ |
| 232 queues := stringset.New(0) | 322 queues := stringset.New(0) |
| 233 | 323 |
| 234 d.mu.RLock() | 324 d.mu.RLock() |
| 235 for _, h := range d.handlers { | 325 for _, h := range d.handlers { |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 269 title := handler.typeName | 359 title := handler.typeName |
| 270 if task.Title != "" { | 360 if task.Title != "" { |
| 271 title = task.Title | 361 title = task.Title |
| 272 } | 362 } |
| 273 | 363 |
| 274 retryOpts := handler.retryOpts | 364 retryOpts := handler.retryOpts |
| 275 if task.RetryOptions != nil { | 365 if task.RetryOptions != nil { |
| 276 retryOpts = task.RetryOptions | 366 retryOpts = task.RetryOptions |
| 277 } | 367 } |
| 278 | 368 |
| 279 // There's some weird restrictions on what characters are allowed inside
task | |
| 280 // names. Lexicographically close names also cause hot spot problems in
the | |
| 281 // Task Queues backend. To avoid these two issues, we always use SHA256
hashes | |
| 282 // as task names. Also each task kind owns its own namespace of deduplic
ation | |
| 283 // keys, so add task type to the digest as well. | |
| 284 name := "" | |
| 285 if task.DeduplicationKey != "" { | |
| 286 h := sha256.New() | |
| 287 h.Write([]byte(handler.typeName)) | |
| 288 h.Write([]byte{0}) | |
| 289 h.Write([]byte(task.DeduplicationKey)) | |
| 290 name = hex.EncodeToString(h.Sum(nil)) | |
| 291 } | |
| 292 | |
| 293 return &taskqueue.Task{ | 369 return &taskqueue.Task{ |
| 294 Path: fmt.Sprintf("%s%s/%s", d.baseURL(), handler.queue,
title), | 370 Path: fmt.Sprintf("%s%s/%s", d.baseURL(), handler.queue,
title), |
| 295 » » Name: name, | 371 » » Name: task.Name(), |
| 296 Method: "POST", | 372 Method: "POST", |
| 297 Payload: blob, | 373 Payload: blob, |
| 298 ETA: task.ETA, | 374 ETA: task.ETA, |
| 299 Delay: task.Delay, | 375 Delay: task.Delay, |
| 300 RetryOptions: retryOpts, | 376 RetryOptions: retryOpts, |
| 301 }, handler.queue, nil | 377 }, handler.queue, nil |
| 302 } | 378 } |
| 303 | 379 |
| 304 // baseURL returns a URL prefix for all HTTP routes used by Dispatcher. | 380 // baseURL returns a URL prefix for all HTTP routes used by Dispatcher. |
| 305 // | 381 // |
| (...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 407 return nil, fmt.Errorf("no task body given") | 483 return nil, fmt.Errorf("no task body given") |
| 408 } | 484 } |
| 409 | 485 |
| 410 task := reflect.New(tp.Elem()).Interface().(proto.Message) | 486 task := reflect.New(tp.Elem()).Interface().(proto.Message) |
| 411 if err := jsonpb.Unmarshal(bytes.NewReader(*env.Body), task); err != nil
{ | 487 if err := jsonpb.Unmarshal(bytes.NewReader(*env.Body), task); err != nil
{ |
| 412 return nil, err | 488 return nil, err |
| 413 } | 489 } |
| 414 | 490 |
| 415 return task, nil | 491 return task, nil |
| 416 } | 492 } |
| 493 |
| 494 //////////////////////////////////////////////////////////////////////////////// |
| 495 |
| 496 // flattenErrors collapses a multi-dimensional MultiError space into a flat |
| 497 // MultiError, removing "nil" errors. |
| 498 // |
| 499 // If err is not an errors.MultiError, will return err directly. |
| 500 // |
| 501 // As a special case, if merr contains no non-nil errors, nil will be returned. |
| 502 func flattenErrors(err error) error { |
| 503 var ret errors.MultiError |
| 504 flattenErrorsRec(&ret, err) |
| 505 if len(ret) == 0 { |
| 506 return nil |
| 507 } |
| 508 return ret |
| 509 } |
| 510 |
| 511 func flattenErrorsRec(ret *errors.MultiError, err error) { |
| 512 switch et := err.(type) { |
| 513 case nil: |
| 514 case errors.MultiError: |
| 515 for _, e := range et { |
| 516 flattenErrorsRec(ret, e) |
| 517 } |
| 518 default: |
| 519 *ret = append(*ret, et) |
| 520 } |
| 521 } |
| OLD | NEW |