Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright 2017 The LUCI Authors. | 1 // Copyright 2017 The LUCI Authors. |
| 2 // | 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
| 6 // | 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // | 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 52 } | 52 } |
| 53 | 53 |
| 54 // Task contains task body and additional parameters that influence how it is | 54 // Task contains task body and additional parameters that influence how it is |
| 55 // routed. | 55 // routed. |
| 56 type Task struct { | 56 type Task struct { |
| 57 // Payload is task's payload as well as indicator of its type. | 57 // Payload is task's payload as well as indicator of its type. |
| 58 // | 58 // |
| 59 // Tasks are routed based on type of the payload message, see RegisterTa sk. | 59 // Tasks are routed based on type of the payload message, see RegisterTa sk. |
| 60 Payload proto.Message | 60 Payload proto.Message |
| 61 | 61 |
| 62 // NamePrefix, if not empty, is a string that will be prefixed to the ta sk's | |
| 63 // name. Characters in NamePrefix must be appropriate task queue name | |
| 64 // characters. NamePrefix can be useful because the Task Queue system al lows | |
| 65 // users to search for tasks by prefix. | |
| 66 // | |
| 67 // Lexicographically close names can cause hot spots in the Task Queues | |
| 68 // backend. If NamePrefix is specified, users should try and ensure that | |
| 69 // it is friendly to sharding (e.g., begins with a hash string). | |
| 70 // | |
| 71 // Setting NamePrefix and/or DeduplicationKey will result in a named tas k | |
| 72 // being generated. This task can be cancelled using DeleteTask. | |
| 73 NamePrefix string | |
| 74 | |
| 62 // DeduplicationKey is optional unique key of the task. | 75 // DeduplicationKey is optional unique key of the task. |
| 63 // | 76 // |
| 64 // If a task of a given proto type with a given key has already been enq ueued | 77 // If a task of a given proto type with a given key has already been enq ueued |
| 65 // recently, this task will be silently ignored. | 78 // recently, this task will be silently ignored. |
| 66 // | 79 // |
| 67 // Such tasks can only be used outside of transactions. | 80 // Such tasks can only be used outside of transactions. |
| 81 // | |
| 82 // Setting NamePrefix and/or DeduplicationKey will result in a named tas k | |
| 83 // being generated. This task can be cancelled using DeleteTask. | |
| 68 DeduplicationKey string | 84 DeduplicationKey string |
| 69 | 85 |
| 70 // Title is optional string that identifies the task in HTTP logs. | 86 // Title is optional string that identifies the task in HTTP logs. |
| 71 // | 87 // |
| 72 // It will show up as a suffix in task handler URL. It exists exclusivel y to | 88 // It will show up as a suffix in task handler URL. It exists exclusivel y to |
| 73 // simplify reading HTTP logs. It serves no other purpose! In particular , | 89 // simplify reading HTTP logs. It serves no other purpose! In particular , |
| 74 // it is NOT a task name. | 90 // it is NOT a task name. |
| 75 // | 91 // |
| 76 // Handlers won't ever see it. Pass all information through the task bod y. | 92 // Handlers won't ever see it. Pass all information through the task bod y. |
| 77 Title string | 93 Title string |
| 78 | 94 |
| 79 // Delay specifies the duration the task queue service must wait before | 95 // Delay specifies the duration the task queue service must wait before |
| 80 // executing the task. | 96 // executing the task. |
| 81 // | 97 // |
| 82 // Either Delay or ETA may be set, but not both. | 98 // Either Delay or ETA may be set, but not both. |
| 83 Delay time.Duration | 99 Delay time.Duration |
| 84 | 100 |
| 85 // ETA specifies the earliest time a task may be executed. | 101 // ETA specifies the earliest time a task may be executed. |
| 86 // | 102 // |
| 87 // Either Delay or ETA may be set, but not both. | 103 // Either Delay or ETA may be set, but not both. |
| 88 ETA time.Time | 104 ETA time.Time |
| 89 | 105 |
| 90 // Retry options for this task. | 106 // Retry options for this task. |
| 91 // | 107 // |
| 92 // If given, overrides default options set when this task was registered . | 108 // If given, overrides default options set when this task was registered . |
| 93 RetryOptions *taskqueue.RetryOptions | 109 RetryOptions *taskqueue.RetryOptions |
| 94 } | 110 } |
| 95 | 111 |
| 112 // Name generates and returns the task's name. | |
| 113 // | |
| 114 // If the task is not a named task (doesn't have NamePrefix or DeduplicationKey | |
| 115 // set), this will return an empty string. | |
| 116 func (task *Task) Name() string { | |
| 117 if task.NamePrefix == "" && task.DeduplicationKey == "" { | |
| 118 return "" | |
| 119 } | |
| 120 | |
| 121 parts := make([]string, 0, 2) | |
| 122 | |
| 123 if task.NamePrefix != "" { | |
| 124 parts = append(parts, task.NamePrefix) | |
| 125 } | |
| 126 | |
| 127 // There's some weird restrictions on what characters are allowed inside task | |
| 128 // names. Lexicographically close names also cause hot spot problems in the | |
| 129 // Task Queues backend. To avoid these two issues, we always use SHA256 hashes | |
| 130 // as task names. Also each task kind owns its own namespace of deduplic ation | |
| 131 // keys, so add task type to the digest as well. | |
| 132 if task.DeduplicationKey != "" { | |
| 133 h := sha256.New() | |
| 134 if task.Payload != nil { | |
|
Vadim Sh.
2017/08/03 20:12:41
tasks without payload are meaningless in this mode
dnj
2017/08/03 20:56:27
No, I suppose a panic is appropriate.
| |
| 135 h.Write([]byte(proto.MessageName(task.Payload))) | |
| 136 } | |
| 137 h.Write([]byte{0}) | |
| 138 h.Write([]byte(task.DeduplicationKey)) | |
| 139 parts = append(parts, hex.EncodeToString(h.Sum(nil))) | |
| 140 } | |
| 141 | |
| 142 return strings.Join(parts, "") | |
|
Vadim Sh.
2017/08/03 20:12:41
nit: let's use '-' as separator (if it's allowed i
dnj
2017/08/03 20:56:27
It is allowed. OK.
| |
| 143 } | |
| 144 | |
| 96 // Handler is called to handle one enqueued task. | 145 // Handler is called to handle one enqueued task. |
| 97 // | 146 // |
| 98 // The passed context is produced by a middleware chain installed with | 147 // The passed context is produced by a middleware chain installed with |
| 99 // InstallHandlers. | 148 // InstallHandlers. |
| 100 // | 149 // |
| 101 // execCount corresponds to X-AppEngine-TaskExecutionCount header value: it is | 150 // execCount corresponds to X-AppEngine-TaskExecutionCount header value: it is |
| 102 // 1 on first execution attempt, 2 on a retry, and so on. | 151 // 1 on first execution attempt, 2 on a retry, and so on. |
| 103 // | 152 // |
| 104 // May return transient errors. In this case, task queue may attempt to | 153 // May return transient errors. In this case, task queue may attempt to |
| 105 // redeliver the task (depending on RetryOptions). | 154 // redeliver the task (depending on RetryOptions). |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 137 } | 186 } |
| 138 | 187 |
| 139 d.handlers[name] = handler{ | 188 d.handlers[name] = handler{ |
| 140 cb: cb, | 189 cb: cb, |
| 141 typeName: name, | 190 typeName: name, |
| 142 queue: queue, | 191 queue: queue, |
| 143 retryOpts: opts, | 192 retryOpts: opts, |
| 144 } | 193 } |
| 145 } | 194 } |
| 146 | 195 |
| 147 // AddTask submits given tasks to an appropriate task queue. | 196 // runBatchesPerQueue is a generic parallel task distributor. It solves the |
| 197 // problems that: | |
| 198 //» - "tasks" may be assigned to different queues, and tasks assigned to the | |
| 199 // same queue should be batched together. | |
| 200 //» - Any given batch may exceed queue operation limits, and thus needs to b e | |
| 201 //» broken into multiple operations on sub-batches. | |
| 148 // | 202 // |
| 149 // It means, at some later time in some other GAE process, callbacks registered | 203 // fn is called for each sub-batch assigned to each queue. All resulting errors |
| 150 // as handlers for corresponding proto types will be called. | 204 // are then flattened. If no fn invocation returns any errors, nil will be |
| 151 // | 205 // returned. If a single error is returned, this function will return that |
| 152 // If the given context is transactional, inherits the transaction. Note if | 206 // error. If an errors.MultiError is returned, it and any embedded |
| 153 // running outside of a transaction and multiple tasks are passed, the operation | 207 // MultiError (recursively) will be flattened into a single MultiError |
| 154 // is not atomic: it returns an error if at least one enqueue operation failed | 208 // containing only the non-nil errors. This simplifies user expectations. |
|
Vadim Sh.
2017/08/03 20:12:41
I did it this way because I had never seen we actu
dnj
2017/08/03 20:56:27
I think it's fine. If a user needs something more,
Vadim Sh.
2017/08/03 21:04:05
I'm just explaining why the code wasn't rigorous h
| |
| 155 // (there's no way to figure out which one exactly). | 209 func (d *Dispatcher) runBatchesPerQueue(c context.Context, tasks []*Task, |
| 156 // | 210 » fn func(c context.Context, queue string, tasks []*taskqueue.Task) error) error { |
| 157 // Returns only transient errors. Unlike regular Task Queue's Add, | 211 |
| 158 // ErrTaskAlreadyAdded is not considered an error. | |
| 159 func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error { | |
| 160 if len(tasks) == 0 { | 212 if len(tasks) == 0 { |
| 161 return nil | 213 return nil |
| 162 } | 214 } |
| 163 | 215 |
| 164 // Handle the most common case of one task in a more efficient way. | 216 // Handle the most common case of one task in a more efficient way. |
| 165 if len(tasks) == 1 { | 217 if len(tasks) == 1 { |
| 166 t, queue, err := d.tqTask(tasks[0]) | 218 t, queue, err := d.tqTask(tasks[0]) |
| 167 if err != nil { | 219 if err != nil { |
| 168 return err | 220 return err |
| 169 } | 221 } |
| 170 » » if err := taskqueue.Add(c, queue, t); err != nil { | 222 » » if err := fn(c, queue, []*taskqueue.Task{t}); err != nil { |
| 171 » » » if err == taskqueue.ErrTaskAlreadyAdded { | |
| 172 » » » » return nil | |
| 173 » » » } | |
| 174 return transient.Tag.Apply(err) | 223 return transient.Tag.Apply(err) |
| 175 } | 224 } |
| 176 return nil | 225 return nil |
| 177 } | 226 } |
| 178 | 227 |
| 179 perQueue := map[string][]*taskqueue.Task{} | 228 perQueue := map[string][]*taskqueue.Task{} |
| 180 for _, task := range tasks { | 229 for _, task := range tasks { |
| 181 t, queue, err := d.tqTask(task) | 230 t, queue, err := d.tqTask(task) |
| 182 if err != nil { | 231 if err != nil { |
| 183 return err | 232 return err |
| 184 } | 233 } |
| 185 perQueue[queue] = append(perQueue[queue], t) | 234 perQueue[queue] = append(perQueue[queue], t) |
| 186 } | 235 } |
| 187 | 236 |
| 188 // Enqueue in parallel, per-queue, split into batches based on Task Queu e | 237 // Enqueue in parallel, per-queue, split into batches based on Task Queu e |
| 189 // RPC limits (100 tasks per batch). | 238 // RPC limits (100 tasks per batch). |
| 239 const maxBatchSize = 100 | |
| 190 errs := make(chan error) | 240 errs := make(chan error) |
| 191 ops := 0 | 241 ops := 0 |
| 192 for q, tasks := range perQueue { | 242 for q, tasks := range perQueue { |
| 193 for len(tasks) > 0 { | 243 for len(tasks) > 0 { |
| 194 » » » count := 100 | 244 » » » count := maxBatchSize |
| 195 if count > len(tasks) { | 245 if count > len(tasks) { |
| 196 count = len(tasks) | 246 count = len(tasks) |
| 197 } | 247 } |
| 198 go func(q string, batch []*taskqueue.Task) { | 248 go func(q string, batch []*taskqueue.Task) { |
| 199 » » » » errs <- taskqueue.Add(c, q, batch...) | 249 » » » » errs <- fn(c, q, batch) |
| 200 }(q, tasks[:count]) | 250 }(q, tasks[:count]) |
| 201 tasks = tasks[count:] | 251 tasks = tasks[count:] |
| 202 ops++ | 252 ops++ |
| 203 } | 253 } |
| 204 } | 254 } |
| 205 | 255 |
| 206 » // Gather all errors throwing away ErrTaskAlreadyAdded. | 256 » all := errors.NewLazyMultiError(ops) |
| 207 » var all errors.MultiError | |
| 208 for i := 0; i < ops; i++ { | 257 for i := 0; i < ops; i++ { |
| 209 err := <-errs | 258 err := <-errs |
| 210 » » if merr, yep := err.(errors.MultiError); yep { | 259 » » if err != nil { |
| 211 » » » for _, e := range merr { | 260 » » » all.Assign(i, err) |
| 212 » » » » if e != nil && e != taskqueue.ErrTaskAlreadyAdde d { | |
| 213 » » » » » all = append(all, e) | |
| 214 » » » » } | |
| 215 » » » } | |
| 216 » » } else if err != nil && err != taskqueue.ErrTaskAlreadyAdded { | |
| 217 » » » all = append(all, err) | |
| 218 } | 261 } |
| 219 } | 262 } |
| 220 | 263 |
| 221 » if len(all) == 0 { | 264 » if err := flattenErrors(all.Get()); err != nil { |
| 265 » » return transient.Tag.Apply(err) | |
| 266 » } | |
| 267 » return nil | |
| 268 } | |
| 269 | |
| 270 // AddTask submits given tasks to an appropriate task queue. | |
| 271 // | |
| 272 // It means, at some later time in some other GAE process, callbacks registered | |
| 273 // as handlers for corresponding proto types will be called. | |
| 274 // | |
| 275 // If the given context is transactional or namespaced, inherits the | |
| 276 // transaction/namespace. Note if running outside of a transaction and multiple | |
| 277 // tasks are passed, the operation is not atomic: it returns an error if at | |
| 278 // least one enqueue operation failed (there's no way to figure out which one | |
| 279 // exactly). | |
| 280 // | |
| 281 // Returns only transient errors. Unlike regular Task Queue's Add, | |
| 282 // ErrTaskAlreadyAdded is not considered an error. | |
| 283 func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error { | |
| 284 » return d.runBatchesPerQueue(c, tasks, func(c context.Context, queue stri ng, tasks []*taskqueue.Task) error { | |
| 285 » » if err := taskqueue.Add(c, queue, tasks...); err != nil { | |
| 286 » » » return errors.Filter(err, taskqueue.ErrTaskAlreadyAdded) | |
|
Vadim Sh.
2017/08/03 20:12:41
does this handle the case when err is taskqueue.Er
dnj
2017/08/03 20:56:27
yep
| |
| 287 » » } | |
| 222 return nil | 288 return nil |
| 223 » } | 289 » }) |
| 290 } | |
| 224 | 291 |
| 225 » return transient.Tag.Apply(all) | 292 // DeleteTask deletes the specified tasks from their queues. |
| 293 // | |
| 294 // If the given context is transactional or namespaced, inherits the | |
| 295 // transaction/namespace. Note if running outside of a transaction and multiple | |
| 296 // tasks are passed, the operation is not atomic: it returns an error if at | |
| 297 // least one enqueue operation failed (there's no way to figure out which one | |
| 298 // exactly). | |
| 299 // | |
| 300 // Returns only transient errors. Unlike regular Task Queue's Delete, | |
| 301 // attempts to delete an unknown or tombstoned task are not considered errors. | |
| 302 func (d *Dispatcher) DeleteTask(c context.Context, tasks ...*Task) error { | |
| 303 » return d.runBatchesPerQueue(c, tasks, func(c context.Context, queue stri ng, tasks []*taskqueue.Task) error { | |
| 304 » » return errors.FilterFunc(taskqueue.Delete(c, queue, tasks...), f unc(err error) bool { | |
| 305 » » » // Currently, the best way to detect an attempt to delet e an unknown task | |
| 306 » » » // is to check the string with tolerable error message p hrases. | |
| 307 » » » for _, phrase := range []string{"UNKNOWN_TASK", "TOMBSTO NED_TASK"} { | |
| 308 » » » » if strings.Contains(err.Error(), phrase) { | |
| 309 » » » » » return true | |
| 310 » » » » } | |
| 311 » » » } | |
| 312 » » » return false | |
| 313 » » }) | |
| 314 » }) | |
| 226 } | 315 } |
| 227 | 316 |
| 228 // InstallRoutes installs appropriate HTTP routes in the router. | 317 // InstallRoutes installs appropriate HTTP routes in the router. |
| 229 // | 318 // |
| 230 // Must be called only after all task handlers are registered! | 319 // Must be called only after all task handlers are registered! |
| 231 func (d *Dispatcher) InstallRoutes(r *router.Router, mw router.MiddlewareChain) { | 320 func (d *Dispatcher) InstallRoutes(r *router.Router, mw router.MiddlewareChain) { |
| 232 queues := stringset.New(0) | 321 queues := stringset.New(0) |
| 233 | 322 |
| 234 d.mu.RLock() | 323 d.mu.RLock() |
| 235 for _, h := range d.handlers { | 324 for _, h := range d.handlers { |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 269 title := handler.typeName | 358 title := handler.typeName |
| 270 if task.Title != "" { | 359 if task.Title != "" { |
| 271 title = task.Title | 360 title = task.Title |
| 272 } | 361 } |
| 273 | 362 |
| 274 retryOpts := handler.retryOpts | 363 retryOpts := handler.retryOpts |
| 275 if task.RetryOptions != nil { | 364 if task.RetryOptions != nil { |
| 276 retryOpts = task.RetryOptions | 365 retryOpts = task.RetryOptions |
| 277 } | 366 } |
| 278 | 367 |
| 279 // There's some weird restrictions on what characters are allowed inside task | |
| 280 // names. Lexicographically close names also cause hot spot problems in the | |
| 281 // Task Queues backend. To avoid these two issues, we always use SHA256 hashes | |
| 282 // as task names. Also each task kind owns its own namespace of deduplic ation | |
| 283 // keys, so add task type to the digest as well. | |
| 284 name := "" | |
| 285 if task.DeduplicationKey != "" { | |
| 286 h := sha256.New() | |
| 287 h.Write([]byte(handler.typeName)) | |
| 288 h.Write([]byte{0}) | |
| 289 h.Write([]byte(task.DeduplicationKey)) | |
| 290 name = hex.EncodeToString(h.Sum(nil)) | |
| 291 } | |
| 292 | |
| 293 return &taskqueue.Task{ | 368 return &taskqueue.Task{ |
| 294 Path: fmt.Sprintf("%s%s/%s", d.baseURL(), handler.queue, title), | 369 Path: fmt.Sprintf("%s%s/%s", d.baseURL(), handler.queue, title), |
| 295 » » Name: name, | 370 » » Name: task.Name(), |
| 296 Method: "POST", | 371 Method: "POST", |
| 297 Payload: blob, | 372 Payload: blob, |
| 298 ETA: task.ETA, | 373 ETA: task.ETA, |
| 299 Delay: task.Delay, | 374 Delay: task.Delay, |
| 300 RetryOptions: retryOpts, | 375 RetryOptions: retryOpts, |
| 301 }, handler.queue, nil | 376 }, handler.queue, nil |
| 302 } | 377 } |
| 303 | 378 |
| 304 // baseURL returns a URL prefix for all HTTP routes used by Dispatcher. | 379 // baseURL returns a URL prefix for all HTTP routes used by Dispatcher. |
| 305 // | 380 // |
| (...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 407 return nil, fmt.Errorf("no task body given") | 482 return nil, fmt.Errorf("no task body given") |
| 408 } | 483 } |
| 409 | 484 |
| 410 task := reflect.New(tp.Elem()).Interface().(proto.Message) | 485 task := reflect.New(tp.Elem()).Interface().(proto.Message) |
| 411 if err := jsonpb.Unmarshal(bytes.NewReader(*env.Body), task); err != nil { | 486 if err := jsonpb.Unmarshal(bytes.NewReader(*env.Body), task); err != nil { |
| 412 return nil, err | 487 return nil, err |
| 413 } | 488 } |
| 414 | 489 |
| 415 return task, nil | 490 return task, nil |
| 416 } | 491 } |
| 492 | |
| 493 //////////////////////////////////////////////////////////////////////////////// | |
| 494 | |
| 495 // flattenErrors collapses a multi-dimensional MultiError space into a flat | |
| 496 // MultiError, removing "nil" errors. | |
| 497 // | |
| 498 // If err is not an errors.MultiError, will return err directly. | |
| 499 // | |
| 500 // As a special case, if merr contains no non-nil errors, nil will be returned. | |
| 501 func flattenErrors(err error) error { | |
| 502 var ret errors.MultiError | |
| 503 flattenErrorsRec(&ret, err) | |
| 504 if len(ret) == 0 { | |
| 505 return nil | |
| 506 } | |
| 507 return ret | |
| 508 } | |
| 509 | |
| 510 func flattenErrorsRec(ret *errors.MultiError, err error) { | |
| 511 switch et := err.(type) { | |
| 512 case nil: | |
| 513 case errors.MultiError: | |
| 514 for _, e := range et { | |
| 515 flattenErrorsRec(ret, e) | |
| 516 } | |
| 517 default: | |
| 518 *ret = append(*ret, et) | |
| 519 } | |
| 520 } | |
| OLD | NEW |