Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(2387)

Unified Diff: appengine/tq/tq.go

Issue 2986373002: [tq] Enable task deletion. (Closed)
Patch Set: comments Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | appengine/tq/tq_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: appengine/tq/tq.go
diff --git a/appengine/tq/tq.go b/appengine/tq/tq.go
index 6f974ddf8d03bde018da16042fa4c11f7f526bec..397c46506bf6dcb6c238b39a414d32751a832274 100644
--- a/appengine/tq/tq.go
+++ b/appengine/tq/tq.go
@@ -59,12 +59,28 @@ type Task struct {
// Tasks are routed based on type of the payload message, see RegisterTask.
Payload proto.Message
+ // NamePrefix, if not empty, is a string that will be prefixed to the task's
+ // name. Characters in NamePrefix must be appropriate task queue name
+ // characters. NamePrefix can be useful because the Task Queue system allows
+ // users to search for tasks by prefix.
+ //
+ // Lexicographically close names can cause hot spots in the Task Queues
+ // backend. If NamePrefix is specified, users should try and ensure that
+ // it is friendly to sharding (e.g., begins with a hash string).
+ //
+ // Setting NamePrefix and/or DeduplicationKey will result in a named task
+ // being generated. This task can be cancelled using DeleteTask.
+ NamePrefix string
+
// DeduplicationKey is optional unique key of the task.
//
// If a task of a given proto type with a given key has already been enqueued
// recently, this task will be silently ignored.
//
// Such tasks can only be used outside of transactions.
+ //
+ // Setting NamePrefix and/or DeduplicationKey will result in a named task
+ // being generated. This task can be cancelled using DeleteTask.
DeduplicationKey string
// Title is optional string that identifies the task in HTTP logs.
@@ -93,6 +109,40 @@ type Task struct {
RetryOptions *taskqueue.RetryOptions
}
+// Name generates and returns the task's name.
+//
+// If the task is not a named task (doesn't have NamePrefix or DeduplicationKey
+// set), this will return an empty string.
+func (task *Task) Name() string {
+ if task.NamePrefix == "" && task.DeduplicationKey == "" {
+ return ""
+ }
+
+ parts := make([]string, 0, 2)
+
+ if task.NamePrefix != "" {
+ parts = append(parts, task.NamePrefix)
+ }
+
+ // There's some weird restrictions on what characters are allowed inside task
+ // names. Lexicographically close names also cause hot spot problems in the
+ // Task Queues backend. To avoid these two issues, we always use SHA256 hashes
+ // as task names. Also each task kind owns its own namespace of deduplication
+ // keys, so add task type to the digest as well.
+ if task.DeduplicationKey != "" {
+ h := sha256.New()
+ if task.Payload == nil {
+ panic("task must have a Payload")
+ }
+ h.Write([]byte(proto.MessageName(task.Payload)))
+ h.Write([]byte{0})
+ h.Write([]byte(task.DeduplicationKey))
+ parts = append(parts, hex.EncodeToString(h.Sum(nil)))
+ }
+
+ return strings.Join(parts, "-")
+}
+
// Handler is called to handle one enqueued task.
//
// The passed context is produced by a middleware chain installed with
@@ -144,19 +194,22 @@ func (d *Dispatcher) RegisterTask(prototype proto.Message, cb Handler, queue str
}
}
-// AddTask submits given tasks to an appropriate task queue.
-//
-// It means, at some later time in some other GAE process, callbacks registered
-// as handlers for corresponding proto types will be called.
-//
-// If the given context is transactional, inherits the transaction. Note if
-// running outside of a transaction and multiple tasks are passed, the operation
-// is not atomic: it returns an error if at least one enqueue operation failed
-// (there's no way to figure out which one exactly).
+// runBatchesPerQueue is a generic parallel task distributor. It solves the
+// problems that:
+// - "tasks" may be assigned to different queues, and tasks assigned to the
+// same queue should be batched together.
+// - Any given batch may exceed queue operation limits, and thus needs to be
+// broken into multiple operations on sub-batches.
//
-// Returns only transient errors. Unlike regular Task Queue's Add,
-// ErrTaskAlreadyAdded is not considered an error.
-func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error {
+// fn is called for each sub-batch assigned to each queue. All resulting errors
+// are then flattened. If no fn invocation returns any errors, nil will be
+// returned. If a single error is returned, this function will return that
+// error. If an errors.MultiError is returned, it and any embedded
+// MultiError (recursively) will be flattened into a single MultiError
+// containing only the non-nil errors. This simplifies user expectations.
+func (d *Dispatcher) runBatchesPerQueue(c context.Context, tasks []*Task,
+ fn func(c context.Context, queue string, tasks []*taskqueue.Task) error) error {
+
if len(tasks) == 0 {
return nil
}
@@ -167,10 +220,7 @@ func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error {
if err != nil {
return err
}
- if err := taskqueue.Add(c, queue, t); err != nil {
- if err == taskqueue.ErrTaskAlreadyAdded {
- return nil
- }
+ if err := fn(c, queue, []*taskqueue.Task{t}); err != nil {
return transient.Tag.Apply(err)
}
return nil
@@ -187,42 +237,82 @@ func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error {
// Enqueue in parallel, per-queue, split into batches based on Task Queue
// RPC limits (100 tasks per batch).
+ const maxBatchSize = 100
errs := make(chan error)
ops := 0
for q, tasks := range perQueue {
for len(tasks) > 0 {
- count := 100
+ count := maxBatchSize
if count > len(tasks) {
count = len(tasks)
}
go func(q string, batch []*taskqueue.Task) {
- errs <- taskqueue.Add(c, q, batch...)
+ errs <- fn(c, q, batch)
}(q, tasks[:count])
tasks = tasks[count:]
ops++
}
}
- // Gather all errors throwing away ErrTaskAlreadyAdded.
- var all errors.MultiError
+ all := errors.NewLazyMultiError(ops)
for i := 0; i < ops; i++ {
err := <-errs
- if merr, yep := err.(errors.MultiError); yep {
- for _, e := range merr {
- if e != nil && e != taskqueue.ErrTaskAlreadyAdded {
- all = append(all, e)
- }
- }
- } else if err != nil && err != taskqueue.ErrTaskAlreadyAdded {
- all = append(all, err)
+ if err != nil {
+ all.Assign(i, err)
}
}
- if len(all) == 0 {
- return nil
+ if err := flattenErrors(all.Get()); err != nil {
+ return transient.Tag.Apply(err)
}
+ return nil
+}
+
+// AddTask submits given tasks to an appropriate task queue.
+//
+// It means, at some later time in some other GAE process, callbacks registered
+// as handlers for corresponding proto types will be called.
+//
+// If the given context is transactional or namespaced, inherits the
+// transaction/namespace. Note if running outside of a transaction and multiple
+// tasks are passed, the operation is not atomic: it returns an error if at
+// least one enqueue operation failed (there's no way to figure out which one
+// exactly).
+//
+// Returns only transient errors. Unlike regular Task Queue's Add,
+// ErrTaskAlreadyAdded is not considered an error.
+func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error {
+ return d.runBatchesPerQueue(c, tasks, func(c context.Context, queue string, tasks []*taskqueue.Task) error {
+ if err := taskqueue.Add(c, queue, tasks...); err != nil {
+ return errors.Filter(err, taskqueue.ErrTaskAlreadyAdded)
+ }
+ return nil
+ })
+}
- return transient.Tag.Apply(all)
+// DeleteTask deletes the specified tasks from their queues.
+//
+// If the given context is transactional or namespaced, inherits the
+// transaction/namespace. Note if running outside of a transaction and multiple
+// tasks are passed, the operation is not atomic: it returns an error if at
+// least one enqueue operation failed (there's no way to figure out which one
+// exactly).
+//
+// Returns only transient errors. Unlike regular Task Queue's Delete,
+// attempts to delete an unknown or tombstoned task are not considered errors.
+func (d *Dispatcher) DeleteTask(c context.Context, tasks ...*Task) error {
+ return d.runBatchesPerQueue(c, tasks, func(c context.Context, queue string, tasks []*taskqueue.Task) error {
+ return errors.FilterFunc(taskqueue.Delete(c, queue, tasks...), func(err error) bool {
+ // Currently, the best way to detect an attempt to delete an unknown task
+ // is to check the string with tolerable error message phrases.
+ for _, phrase := range []string{"UNKNOWN_TASK", "TOMBSTONED_TASK"} {
+ if strings.Contains(err.Error(), phrase) {
+ return true
+ }
+ }
+ return false
+ })
+ })
}
// InstallRoutes installs appropriate HTTP routes in the router.
@@ -276,23 +366,9 @@ func (d *Dispatcher) tqTask(task *Task) (*taskqueue.Task, string, error) {
retryOpts = task.RetryOptions
}
- // There's some weird restrictions on what characters are allowed inside task
- // names. Lexicographically close names also cause hot spot problems in the
- // Task Queues backend. To avoid these two issues, we always use SHA256 hashes
- // as task names. Also each task kind owns its own namespace of deduplication
- // keys, so add task type to the digest as well.
- name := ""
- if task.DeduplicationKey != "" {
- h := sha256.New()
- h.Write([]byte(handler.typeName))
- h.Write([]byte{0})
- h.Write([]byte(task.DeduplicationKey))
- name = hex.EncodeToString(h.Sum(nil))
- }
-
return &taskqueue.Task{
Path: fmt.Sprintf("%s%s/%s", d.baseURL(), handler.queue, title),
- Name: name,
+ Name: task.Name(),
Method: "POST",
Payload: blob,
ETA: task.ETA,
@@ -414,3 +490,32 @@ func deserializePayload(blob []byte) (proto.Message, error) {
return task, nil
}
+
+////////////////////////////////////////////////////////////////////////////////
+
+// flattenErrors collapses a multi-dimensional MultiError space into a flat
+// MultiError, removing "nil" errors.
+//
+// If err is not an errors.MultiError, will return err directly.
+//
+// As a special case, if merr contains no non-nil errors, nil will be returned.
+func flattenErrors(err error) error {
+ var ret errors.MultiError
+ flattenErrorsRec(&ret, err)
+ if len(ret) == 0 {
+ return nil
+ }
+ return ret
+}
+
+func flattenErrorsRec(ret *errors.MultiError, err error) {
+ switch et := err.(type) {
+ case nil:
+ case errors.MultiError:
+ for _, e := range et {
+ flattenErrorsRec(ret, e)
+ }
+ default:
+ *ret = append(*ret, et)
+ }
+}
« no previous file with comments | « no previous file | appengine/tq/tq_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698