| Index: appengine/tq/tq.go
|
| diff --git a/appengine/tq/tq.go b/appengine/tq/tq.go
|
| index 6f974ddf8d03bde018da16042fa4c11f7f526bec..397c46506bf6dcb6c238b39a414d32751a832274 100644
|
| --- a/appengine/tq/tq.go
|
| +++ b/appengine/tq/tq.go
|
| @@ -59,12 +59,28 @@ type Task struct {
|
| // Tasks are routed based on type of the payload message, see RegisterTask.
|
| Payload proto.Message
|
|
|
| + // NamePrefix, if not empty, is a string that will be prefixed to the task's
|
| + // name. Characters in NamePrefix must be appropriate task queue name
|
| + // characters. NamePrefix can be useful because the Task Queue system allows
|
| + // users to search for tasks by prefix.
|
| + //
|
| + // Lexicographically close names can cause hot spots in the Task Queues
|
| + // backend. If NamePrefix is specified, users should try and ensure that
|
| + // it is friendly to sharding (e.g., begins with a hash string).
|
| + //
|
| + // Setting NamePrefix and/or DeduplicationKey will result in a named task
|
| + // being generated. This task can be cancelled using DeleteTask.
|
| + NamePrefix string
|
| +
|
| // DeduplicationKey is optional unique key of the task.
|
| //
|
| // If a task of a given proto type with a given key has already been enqueued
|
| // recently, this task will be silently ignored.
|
| //
|
| // Such tasks can only be used outside of transactions.
|
| + //
|
| + // Setting NamePrefix and/or DeduplicationKey will result in a named task
|
| + // being generated. This task can be cancelled using DeleteTask.
|
| DeduplicationKey string
|
|
|
| // Title is optional string that identifies the task in HTTP logs.
|
| @@ -93,6 +109,40 @@ type Task struct {
|
| RetryOptions *taskqueue.RetryOptions
|
| }
|
|
|
| +// Name generates and returns the task's name.
|
| +//
|
| +// If the task is not a named task (doesn't have NamePrefix or DeduplicationKey
|
| +// set), this will return an empty string.
|
| +func (task *Task) Name() string {
|
| + if task.NamePrefix == "" && task.DeduplicationKey == "" {
|
| + return ""
|
| + }
|
| +
|
| + parts := make([]string, 0, 2)
|
| +
|
| + if task.NamePrefix != "" {
|
| + parts = append(parts, task.NamePrefix)
|
| + }
|
| +
|
| + // There's some weird restrictions on what characters are allowed inside task
|
| + // names. Lexicographically close names also cause hot spot problems in the
|
| + // Task Queues backend. To avoid these two issues, we always use SHA256 hashes
|
| + // as task names. Also each task kind owns its own namespace of deduplication
|
| + // keys, so add task type to the digest as well.
|
| + if task.DeduplicationKey != "" {
|
| + h := sha256.New()
|
| + if task.Payload == nil {
|
| + panic("task must have a Payload")
|
| + }
|
| + h.Write([]byte(proto.MessageName(task.Payload)))
|
| + h.Write([]byte{0})
|
| + h.Write([]byte(task.DeduplicationKey))
|
| + parts = append(parts, hex.EncodeToString(h.Sum(nil)))
|
| + }
|
| +
|
| + return strings.Join(parts, "-")
|
| +}
|
| +
|
| // Handler is called to handle one enqueued task.
|
| //
|
| // The passed context is produced by a middleware chain installed with
|
| @@ -144,19 +194,22 @@ func (d *Dispatcher) RegisterTask(prototype proto.Message, cb Handler, queue str
|
| }
|
| }
|
|
|
| -// AddTask submits given tasks to an appropriate task queue.
|
| -//
|
| -// It means, at some later time in some other GAE process, callbacks registered
|
| -// as handlers for corresponding proto types will be called.
|
| -//
|
| -// If the given context is transactional, inherits the transaction. Note if
|
| -// running outside of a transaction and multiple tasks are passed, the operation
|
| -// is not atomic: it returns an error if at least one enqueue operation failed
|
| -// (there's no way to figure out which one exactly).
|
| +// runBatchesPerQueue is a generic parallel task distributor. It solves the
|
| +// problems that:
|
| +// - "tasks" may be assigned to different queues, and tasks assigned to the
|
| +// same queue should be batched together.
|
| +// - Any given batch may exceed queue operation limits, and thus needs to be
|
| +// broken into multiple operations on sub-batches.
|
| //
|
| -// Returns only transient errors. Unlike regular Task Queue's Add,
|
| -// ErrTaskAlreadyAdded is not considered an error.
|
| -func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error {
|
| +// fn is called for each sub-batch assigned to each queue. All resulting errors
|
| +// are then flattened. If no fn invocation returns any errors, nil will be
|
| +// returned. If a single error is returned, this function will return that
|
| +// error. If an errors.MultiError is returned, it and any embedded
|
| +// MultiError (recursively) will be flattened into a single MultiError
|
| +// containing only the non-nil errors. This simplifies user expectations.
|
| +func (d *Dispatcher) runBatchesPerQueue(c context.Context, tasks []*Task,
|
| + fn func(c context.Context, queue string, tasks []*taskqueue.Task) error) error {
|
| +
|
| if len(tasks) == 0 {
|
| return nil
|
| }
|
| @@ -167,10 +220,7 @@ func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error {
|
| if err != nil {
|
| return err
|
| }
|
| - if err := taskqueue.Add(c, queue, t); err != nil {
|
| - if err == taskqueue.ErrTaskAlreadyAdded {
|
| - return nil
|
| - }
|
| + if err := fn(c, queue, []*taskqueue.Task{t}); err != nil {
|
| return transient.Tag.Apply(err)
|
| }
|
| return nil
|
| @@ -187,42 +237,82 @@ func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error {
|
|
|
| // Enqueue in parallel, per-queue, split into batches based on Task Queue
|
| // RPC limits (100 tasks per batch).
|
| + const maxBatchSize = 100
|
| errs := make(chan error)
|
| ops := 0
|
| for q, tasks := range perQueue {
|
| for len(tasks) > 0 {
|
| - count := 100
|
| + count := maxBatchSize
|
| if count > len(tasks) {
|
| count = len(tasks)
|
| }
|
| go func(q string, batch []*taskqueue.Task) {
|
| - errs <- taskqueue.Add(c, q, batch...)
|
| + errs <- fn(c, q, batch)
|
| }(q, tasks[:count])
|
| tasks = tasks[count:]
|
| ops++
|
| }
|
| }
|
|
|
| - // Gather all errors throwing away ErrTaskAlreadyAdded.
|
| - var all errors.MultiError
|
| + all := errors.NewLazyMultiError(ops)
|
| for i := 0; i < ops; i++ {
|
| err := <-errs
|
| - if merr, yep := err.(errors.MultiError); yep {
|
| - for _, e := range merr {
|
| - if e != nil && e != taskqueue.ErrTaskAlreadyAdded {
|
| - all = append(all, e)
|
| - }
|
| - }
|
| - } else if err != nil && err != taskqueue.ErrTaskAlreadyAdded {
|
| - all = append(all, err)
|
| + if err != nil {
|
| + all.Assign(i, err)
|
| }
|
| }
|
|
|
| - if len(all) == 0 {
|
| - return nil
|
| + if err := flattenErrors(all.Get()); err != nil {
|
| + return transient.Tag.Apply(err)
|
| }
|
| + return nil
|
| +}
|
| +
|
| +// AddTask submits given tasks to an appropriate task queue.
|
| +//
|
| +// It means, at some later time in some other GAE process, callbacks registered
|
| +// as handlers for corresponding proto types will be called.
|
| +//
|
| +// If the given context is transactional or namespaced, inherits the
|
| +// transaction/namespace. Note if running outside of a transaction and multiple
|
| +// tasks are passed, the operation is not atomic: it returns an error if at
|
| +// least one enqueue operation failed (there's no way to figure out which one
|
| +// exactly).
|
| +//
|
| +// Returns only transient errors. Unlike regular Task Queue's Add,
|
| +// ErrTaskAlreadyAdded is not considered an error.
|
| +func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error {
|
| + return d.runBatchesPerQueue(c, tasks, func(c context.Context, queue string, tasks []*taskqueue.Task) error {
|
| + if err := taskqueue.Add(c, queue, tasks...); err != nil {
|
| + return errors.Filter(err, taskqueue.ErrTaskAlreadyAdded)
|
| + }
|
| + return nil
|
| + })
|
| +}
|
|
|
| - return transient.Tag.Apply(all)
|
| +// DeleteTask deletes the specified tasks from their queues.
|
| +//
|
| +// If the given context is transactional or namespaced, inherits the
|
| +// transaction/namespace. Note if running outside of a transaction and multiple
|
| +// tasks are passed, the operation is not atomic: it returns an error if at
|
| +// least one enqueue operation failed (there's no way to figure out which one
|
| +// exactly).
|
| +//
|
| +// Returns only transient errors. Unlike regular Task Queue's Delete,
|
| +// attempts to delete an unknown or tombstoned task are not considered errors.
|
| +func (d *Dispatcher) DeleteTask(c context.Context, tasks ...*Task) error {
|
| + return d.runBatchesPerQueue(c, tasks, func(c context.Context, queue string, tasks []*taskqueue.Task) error {
|
| + return errors.FilterFunc(taskqueue.Delete(c, queue, tasks...), func(err error) bool {
|
| + // Currently, the best way to detect an attempt to delete an unknown task
|
| + // is to check the string with tolerable error message phrases.
|
| + for _, phrase := range []string{"UNKNOWN_TASK", "TOMBSTONED_TASK"} {
|
| + if strings.Contains(err.Error(), phrase) {
|
| + return true
|
| + }
|
| + }
|
| + return false
|
| + })
|
| + })
|
| }
|
|
|
| // InstallRoutes installs appropriate HTTP routes in the router.
|
| @@ -276,23 +366,9 @@ func (d *Dispatcher) tqTask(task *Task) (*taskqueue.Task, string, error) {
|
| retryOpts = task.RetryOptions
|
| }
|
|
|
| - // There's some weird restrictions on what characters are allowed inside task
|
| - // names. Lexicographically close names also cause hot spot problems in the
|
| - // Task Queues backend. To avoid these two issues, we always use SHA256 hashes
|
| - // as task names. Also each task kind owns its own namespace of deduplication
|
| - // keys, so add task type to the digest as well.
|
| - name := ""
|
| - if task.DeduplicationKey != "" {
|
| - h := sha256.New()
|
| - h.Write([]byte(handler.typeName))
|
| - h.Write([]byte{0})
|
| - h.Write([]byte(task.DeduplicationKey))
|
| - name = hex.EncodeToString(h.Sum(nil))
|
| - }
|
| -
|
| return &taskqueue.Task{
|
| Path: fmt.Sprintf("%s%s/%s", d.baseURL(), handler.queue, title),
|
| - Name: name,
|
| + Name: task.Name(),
|
| Method: "POST",
|
| Payload: blob,
|
| ETA: task.ETA,
|
| @@ -414,3 +490,32 @@ func deserializePayload(blob []byte) (proto.Message, error) {
|
|
|
| return task, nil
|
| }
|
| +
|
| +////////////////////////////////////////////////////////////////////////////////
|
| +
|
| +// flattenErrors collapses a multi-dimensional MultiError space into a flat
|
| +// MultiError, removing "nil" errors.
|
| +//
|
| +// If err is not an errors.MultiError, will return err directly.
|
| +//
|
| +// As a special case, if merr contains no non-nil errors, nil will be returned.
|
| +func flattenErrors(err error) error {
|
| + var ret errors.MultiError
|
| + flattenErrorsRec(&ret, err)
|
| + if len(ret) == 0 {
|
| + return nil
|
| + }
|
| + return ret
|
| +}
|
| +
|
| +func flattenErrorsRec(ret *errors.MultiError, err error) {
|
| + switch et := err.(type) {
|
| + case nil:
|
| + case errors.MultiError:
|
| + for _, e := range et {
|
| + flattenErrorsRec(ret, e)
|
| + }
|
| + default:
|
| + *ret = append(*ret, et)
|
| + }
|
| +}
|
|
|