Chromium Code Reviews| Index: appengine/tq/tq.go |
| diff --git a/appengine/tq/tq.go b/appengine/tq/tq.go |
| index 6f974ddf8d03bde018da16042fa4c11f7f526bec..5d05568bba659995d070994a52ba17473aa202ab 100644 |
| --- a/appengine/tq/tq.go |
| +++ b/appengine/tq/tq.go |
| @@ -59,12 +59,28 @@ type Task struct { |
| // Tasks are routed based on type of the payload message, see RegisterTask. |
| Payload proto.Message |
| + // NamePrefix, if not empty, is a string that will be prefixed to the task's |
| + // name. Characters in NamePrefix must be appropriate task queue name |
| + // characters. NamePrefix can be useful because the Task Queue system allows |
| + // users to search for tasks by prefix. |
| + // |
| + // Lexicographically close names can cause hot spots in the Task Queues |
| + // backend. If NamePrefix is specified, users should try and ensure that |
| + // it is friendly to sharding (e.g., begins with a hash string). |
| + // |
| + // Setting NamePrefix and/or DeduplicationKey will result in a named task |
| + // being generated. This task can be cancelled using DeleteTask. |
| + NamePrefix string |
| + |
| // DeduplicationKey is optional unique key of the task. |
| // |
| // If a task of a given proto type with a given key has already been enqueued |
| // recently, this task will be silently ignored. |
| // |
| // Such tasks can only be used outside of transactions. |
| + // |
| + // Setting NamePrefix and/or DeduplicationKey will result in a named task |
| + // being generated. This task can be cancelled using DeleteTask. |
| DeduplicationKey string |
| // Title is optional string that identifies the task in HTTP logs. |
| @@ -93,6 +109,39 @@ type Task struct { |
| RetryOptions *taskqueue.RetryOptions |
| } |
| +// Name generates and returns the task's name. |
| +// |
| +// If the task is not a named task (doesn't have NamePrefix or DeduplicationKey |
| +// set), this will return an empty string. |
| +func (task *Task) Name() string { |
| + if task.NamePrefix == "" && task.DeduplicationKey == "" { |
| + return "" |
| + } |
| + |
| + parts := make([]string, 0, 2) |
| + |
| + if task.NamePrefix != "" { |
| + parts = append(parts, task.NamePrefix) |
| + } |
| + |
| + // There's some weird restrictions on what characters are allowed inside task |
| + // names. Lexicographically close names also cause hot spot problems in the |
| + // Task Queues backend. To avoid these two issues, we always use SHA256 hashes |
| + // as task names. Also each task kind owns its own namespace of deduplication |
| + // keys, so add task type to the digest as well. |
| + if task.DeduplicationKey != "" { |
| + h := sha256.New() |
| + if task.Payload != nil { |
|
Vadim Sh.
2017/08/03 20:12:41
tasks without payload are meaningless in this mode
dnj
2017/08/03 20:56:27
No, I suppose a panic is appropriate.
|
| + h.Write([]byte(proto.MessageName(task.Payload))) |
| + } |
| + h.Write([]byte{0}) |
| + h.Write([]byte(task.DeduplicationKey)) |
| + parts = append(parts, hex.EncodeToString(h.Sum(nil))) |
| + } |
| + |
| + return strings.Join(parts, "") |
|
Vadim Sh.
2017/08/03 20:12:41
nit: let's use '-' as separator (if it's allowed i
dnj
2017/08/03 20:56:27
It is allowed. OK.
|
| +} |
| + |
| // Handler is called to handle one enqueued task. |
| // |
| // The passed context is produced by a middleware chain installed with |
| @@ -144,19 +193,22 @@ func (d *Dispatcher) RegisterTask(prototype proto.Message, cb Handler, queue str |
| } |
| } |
| -// AddTask submits given tasks to an appropriate task queue. |
| -// |
| -// It means, at some later time in some other GAE process, callbacks registered |
| -// as handlers for corresponding proto types will be called. |
| -// |
| -// If the given context is transactional, inherits the transaction. Note if |
| -// running outside of a transaction and multiple tasks are passed, the operation |
| -// is not atomic: it returns an error if at least one enqueue operation failed |
| -// (there's no way to figure out which one exactly). |
| +// runBatchesPerQueue is a generic parallel task distributor. It solves the |
| +// problems that: |
| +// - "tasks" may be assigned to different queues, and tasks assigned to the |
| +// same queue should be batched together. |
| +// - Any given batch may exceed queue operation limits, and thus needs to be |
| +// broken into multiple operations on sub-batches. |
| // |
| -// Returns only transient errors. Unlike regular Task Queue's Add, |
| -// ErrTaskAlreadyAdded is not considered an error. |
| -func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error { |
| +// fn is called for each sub-batch assigned to each queue. All resulting errors |
| +// are then flattened. If no fn invocation returns any errors, nil will be |
| +// returned. If a single error is returned, this function will return that |
| +// error. If an errors.MultiError is returned, it and any embedded |
| +// MultiError (recursively) will be flattened into a single MultiError |
| +// containing only the non-nil errors. This simplifies user expectations. |
|
Vadim Sh.
2017/08/03 20:12:41
I did it this way because I had never seen we actu
dnj
2017/08/03 20:56:27
I think it's fine. If a user needs something more,
Vadim Sh.
2017/08/03 21:04:05
I'm just explaining why the code wasn't rigorous h
|
| +func (d *Dispatcher) runBatchesPerQueue(c context.Context, tasks []*Task, |
| + fn func(c context.Context, queue string, tasks []*taskqueue.Task) error) error { |
| + |
| if len(tasks) == 0 { |
| return nil |
| } |
| @@ -167,10 +219,7 @@ func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error { |
| if err != nil { |
| return err |
| } |
| - if err := taskqueue.Add(c, queue, t); err != nil { |
| - if err == taskqueue.ErrTaskAlreadyAdded { |
| - return nil |
| - } |
| + if err := fn(c, queue, []*taskqueue.Task{t}); err != nil { |
| return transient.Tag.Apply(err) |
| } |
| return nil |
| @@ -187,42 +236,82 @@ func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error { |
| // Enqueue in parallel, per-queue, split into batches based on Task Queue |
| // RPC limits (100 tasks per batch). |
| + const maxBatchSize = 100 |
| errs := make(chan error) |
| ops := 0 |
| for q, tasks := range perQueue { |
| for len(tasks) > 0 { |
| - count := 100 |
| + count := maxBatchSize |
| if count > len(tasks) { |
| count = len(tasks) |
| } |
| go func(q string, batch []*taskqueue.Task) { |
| - errs <- taskqueue.Add(c, q, batch...) |
| + errs <- fn(c, q, batch) |
| }(q, tasks[:count]) |
| tasks = tasks[count:] |
| ops++ |
| } |
| } |
| - // Gather all errors throwing away ErrTaskAlreadyAdded. |
| - var all errors.MultiError |
| + all := errors.NewLazyMultiError(ops) |
| for i := 0; i < ops; i++ { |
| err := <-errs |
| - if merr, yep := err.(errors.MultiError); yep { |
| - for _, e := range merr { |
| - if e != nil && e != taskqueue.ErrTaskAlreadyAdded { |
| - all = append(all, e) |
| - } |
| - } |
| - } else if err != nil && err != taskqueue.ErrTaskAlreadyAdded { |
| - all = append(all, err) |
| + if err != nil { |
| + all.Assign(i, err) |
| } |
| } |
| - if len(all) == 0 { |
| - return nil |
| + if err := flattenErrors(all.Get()); err != nil { |
| + return transient.Tag.Apply(err) |
| } |
| + return nil |
| +} |
| + |
| +// AddTask submits given tasks to an appropriate task queue. |
| +// |
| +// It means, at some later time in some other GAE process, callbacks registered |
| +// as handlers for corresponding proto types will be called. |
| +// |
| +// If the given context is transactional or namespaced, inherits the |
| +// transaction/namespace. Note if running outside of a transaction and multiple |
| +// tasks are passed, the operation is not atomic: it returns an error if at |
| +// least one enqueue operation failed (there's no way to figure out which one |
| +// exactly). |
| +// |
| +// Returns only transient errors. Unlike regular Task Queue's Add, |
| +// ErrTaskAlreadyAdded is not considered an error. |
| +func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error { |
| + return d.runBatchesPerQueue(c, tasks, func(c context.Context, queue string, tasks []*taskqueue.Task) error { |
| + if err := taskqueue.Add(c, queue, tasks...); err != nil { |
| + return errors.Filter(err, taskqueue.ErrTaskAlreadyAdded) |
|
Vadim Sh.
2017/08/03 20:12:41
does this handle the case when err is taskqueue.Er
dnj
2017/08/03 20:56:27
yep
|
| + } |
| + return nil |
| + }) |
| +} |
| - return transient.Tag.Apply(all) |
| +// DeleteTask deletes the specified tasks from their queues. |
| +// |
| +// If the given context is transactional or namespaced, inherits the |
| +// transaction/namespace. Note if running outside of a transaction and multiple |
| +// tasks are passed, the operation is not atomic: it returns an error if at |
| +// least one enqueue operation failed (there's no way to figure out which one |
| +// exactly). |
| +// |
| +// Returns only transient errors. Unlike regular Task Queue's Delete, |
| +// attempts to delete an unknown or tombstoned task are not considered errors. |
| +func (d *Dispatcher) DeleteTask(c context.Context, tasks ...*Task) error { |
| + return d.runBatchesPerQueue(c, tasks, func(c context.Context, queue string, tasks []*taskqueue.Task) error { |
| + return errors.FilterFunc(taskqueue.Delete(c, queue, tasks...), func(err error) bool { |
| + // Currently, the best way to detect an attempt to delete an unknown task |
| + // is to check the string with tolerable error message phrases. |
| + for _, phrase := range []string{"UNKNOWN_TASK", "TOMBSTONED_TASK"} { |
| + if strings.Contains(err.Error(), phrase) { |
| + return true |
| + } |
| + } |
| + return false |
| + }) |
| + }) |
| } |
| // InstallRoutes installs appropriate HTTP routes in the router. |
| @@ -276,23 +365,9 @@ func (d *Dispatcher) tqTask(task *Task) (*taskqueue.Task, string, error) { |
| retryOpts = task.RetryOptions |
| } |
| - // There's some weird restrictions on what characters are allowed inside task |
| - // names. Lexicographically close names also cause hot spot problems in the |
| - // Task Queues backend. To avoid these two issues, we always use SHA256 hashes |
| - // as task names. Also each task kind owns its own namespace of deduplication |
| - // keys, so add task type to the digest as well. |
| - name := "" |
| - if task.DeduplicationKey != "" { |
| - h := sha256.New() |
| - h.Write([]byte(handler.typeName)) |
| - h.Write([]byte{0}) |
| - h.Write([]byte(task.DeduplicationKey)) |
| - name = hex.EncodeToString(h.Sum(nil)) |
| - } |
| - |
| return &taskqueue.Task{ |
| Path: fmt.Sprintf("%s%s/%s", d.baseURL(), handler.queue, title), |
| - Name: name, |
| + Name: task.Name(), |
| Method: "POST", |
| Payload: blob, |
| ETA: task.ETA, |
| @@ -414,3 +489,32 @@ func deserializePayload(blob []byte) (proto.Message, error) { |
| return task, nil |
| } |
| + |
| +//////////////////////////////////////////////////////////////////////////////// |
| + |
| +// flattenErrors collapses a multi-dimensional MultiError space into a flat |
| +// MultiError, removing "nil" errors. |
| +// |
| +// If err is not an errors.MultiError, will return err directly. |
| +// |
| +// As a special case, if merr contains no non-nil errors, nil will be returned. |
| +func flattenErrors(err error) error { |
| + var ret errors.MultiError |
| + flattenErrorsRec(&ret, err) |
| + if len(ret) == 0 { |
| + return nil |
| + } |
| + return ret |
| +} |
| + |
| +func flattenErrorsRec(ret *errors.MultiError, err error) { |
| + switch et := err.(type) { |
| + case nil: |
| + case errors.MultiError: |
| + for _, e := range et { |
| + flattenErrorsRec(ret, e) |
| + } |
| + default: |
| + *ret = append(*ret, et) |
| + } |
| +} |