Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(41)

Side by Side Diff: appengine/tq/tq.go

Issue 2986373002: [tq] Enable task deletion. (Closed)
Patch Set: comments Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | appengine/tq/tq_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2017 The LUCI Authors. 1 // Copyright 2017 The LUCI Authors.
2 // 2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License. 4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at 5 // You may obtain a copy of the License at
6 // 6 //
7 // http://www.apache.org/licenses/LICENSE-2.0 7 // http://www.apache.org/licenses/LICENSE-2.0
8 // 8 //
9 // Unless required by applicable law or agreed to in writing, software 9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, 10 // distributed under the License is distributed on an "AS IS" BASIS,
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
52 } 52 }
53 53
54 // Task contains task body and additional parameters that influence how it is 54 // Task contains task body and additional parameters that influence how it is
55 // routed. 55 // routed.
56 type Task struct { 56 type Task struct {
57 // Payload is task's payload as well as indicator of its type. 57 // Payload is task's payload as well as indicator of its type.
58 // 58 //
59 // Tasks are routed based on type of the payload message, see RegisterTa sk. 59 // Tasks are routed based on type of the payload message, see RegisterTa sk.
60 Payload proto.Message 60 Payload proto.Message
61 61
62 // NamePrefix, if not empty, is a string that will be prefixed to the ta sk's
63 // name. Characters in NamePrefix must be appropriate task queue name
64 // characters. NamePrefix can be useful because the Task Queue system al lows
65 // users to search for tasks by prefix.
66 //
67 // Lexicographically close names can cause hot spots in the Task Queues
68 // backend. If NamePrefix is specified, users should try and ensure that
69 // it is friendly to sharding (e.g., begins with a hash string).
70 //
71 // Setting NamePrefix and/or DeduplicationKey will result in a named tas k
72 // being generated. This task can be cancelled using DeleteTask.
73 NamePrefix string
74
62 // DeduplicationKey is optional unique key of the task. 75 // DeduplicationKey is optional unique key of the task.
63 // 76 //
64 // If a task of a given proto type with a given key has already been enq ueued 77 // If a task of a given proto type with a given key has already been enq ueued
65 // recently, this task will be silently ignored. 78 // recently, this task will be silently ignored.
66 // 79 //
67 // Such tasks can only be used outside of transactions. 80 // Such tasks can only be used outside of transactions.
81 //
82 // Setting NamePrefix and/or DeduplicationKey will result in a named tas k
83 // being generated. This task can be cancelled using DeleteTask.
68 DeduplicationKey string 84 DeduplicationKey string
69 85
70 // Title is optional string that identifies the task in HTTP logs. 86 // Title is optional string that identifies the task in HTTP logs.
71 // 87 //
72 // It will show up as a suffix in task handler URL. It exists exclusivel y to 88 // It will show up as a suffix in task handler URL. It exists exclusivel y to
73 // simplify reading HTTP logs. It serves no other purpose! In particular , 89 // simplify reading HTTP logs. It serves no other purpose! In particular ,
74 // it is NOT a task name. 90 // it is NOT a task name.
75 // 91 //
76 // Handlers won't ever see it. Pass all information through the task bod y. 92 // Handlers won't ever see it. Pass all information through the task bod y.
77 Title string 93 Title string
78 94
79 // Delay specifies the duration the task queue service must wait before 95 // Delay specifies the duration the task queue service must wait before
80 // executing the task. 96 // executing the task.
81 // 97 //
82 // Either Delay or ETA may be set, but not both. 98 // Either Delay or ETA may be set, but not both.
83 Delay time.Duration 99 Delay time.Duration
84 100
85 // ETA specifies the earliest time a task may be executed. 101 // ETA specifies the earliest time a task may be executed.
86 // 102 //
87 // Either Delay or ETA may be set, but not both. 103 // Either Delay or ETA may be set, but not both.
88 ETA time.Time 104 ETA time.Time
89 105
90 // Retry options for this task. 106 // Retry options for this task.
91 // 107 //
92 // If given, overrides default options set when this task was registered . 108 // If given, overrides default options set when this task was registered .
93 RetryOptions *taskqueue.RetryOptions 109 RetryOptions *taskqueue.RetryOptions
94 } 110 }
95 111
112 // Name generates and returns the task's name.
113 //
114 // If the task is not a named task (doesn't have NamePrefix or DeduplicationKey
115 // set), this will return an empty string.
116 func (task *Task) Name() string {
117 if task.NamePrefix == "" && task.DeduplicationKey == "" {
118 return ""
119 }
120
121 parts := make([]string, 0, 2)
122
123 if task.NamePrefix != "" {
124 parts = append(parts, task.NamePrefix)
125 }
126
127 // There's some weird restrictions on what characters are allowed inside task
128 // names. Lexicographically close names also cause hot spot problems in the
129 // Task Queues backend. To avoid these two issues, we always use SHA256 hashes
130 // as task names. Also each task kind owns its own namespace of deduplic ation
131 // keys, so add task type to the digest as well.
132 if task.DeduplicationKey != "" {
133 h := sha256.New()
134 if task.Payload == nil {
135 panic("task must have a Payload")
136 }
137 h.Write([]byte(proto.MessageName(task.Payload)))
138 h.Write([]byte{0})
139 h.Write([]byte(task.DeduplicationKey))
140 parts = append(parts, hex.EncodeToString(h.Sum(nil)))
141 }
142
143 return strings.Join(parts, "-")
144 }
145
96 // Handler is called to handle one enqueued task. 146 // Handler is called to handle one enqueued task.
97 // 147 //
98 // The passed context is produced by a middleware chain installed with 148 // The passed context is produced by a middleware chain installed with
99 // InstallHandlers. 149 // InstallHandlers.
100 // 150 //
101 // execCount corresponds to X-AppEngine-TaskExecutionCount header value: it is 151 // execCount corresponds to X-AppEngine-TaskExecutionCount header value: it is
102 // 1 on first execution attempt, 2 on a retry, and so on. 152 // 1 on first execution attempt, 2 on a retry, and so on.
103 // 153 //
104 // May return transient errors. In this case, task queue may attempt to 154 // May return transient errors. In this case, task queue may attempt to
105 // redeliver the task (depending on RetryOptions). 155 // redeliver the task (depending on RetryOptions).
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
137 } 187 }
138 188
139 d.handlers[name] = handler{ 189 d.handlers[name] = handler{
140 cb: cb, 190 cb: cb,
141 typeName: name, 191 typeName: name,
142 queue: queue, 192 queue: queue,
143 retryOpts: opts, 193 retryOpts: opts,
144 } 194 }
145 } 195 }
146 196
147 // AddTask submits given tasks to an appropriate task queue. 197 // runBatchesPerQueue is a generic parallel task distributor. It solves the
198 // problems that:
199 //» - "tasks" may be assigned to different queues, and tasks assigned to the
200 // same queue should be batched together.
201 //» - Any given batch may exceed queue operation limits, and thus needs to b e
202 //» broken into multiple operations on sub-batches.
148 // 203 //
149 // It means, at some later time in some other GAE process, callbacks registered 204 // fn is called for each sub-batch assigned to each queue. All resulting errors
150 // as handlers for corresponding proto types will be called. 205 // are then flattened. If no fn invocation returns any errors, nil will be
151 // 206 // returned. If a single error is returned, this function will return that
152 // If the given context is transactional, inherits the transaction. Note if 207 // error. If an errors.MultiError is returned, it and any embedded
153 // running outside of a transaction and multiple tasks are passed, the operation 208 // MultiError (recursively) will be flattened into a single MultiError
154 // is not atomic: it returns an error if at least one enqueue operation failed 209 // containing only the non-nil errors. This simplifies user expectations.
155 // (there's no way to figure out which one exactly). 210 func (d *Dispatcher) runBatchesPerQueue(c context.Context, tasks []*Task,
156 // 211 » fn func(c context.Context, queue string, tasks []*taskqueue.Task) error) error {
157 // Returns only transient errors. Unlike regular Task Queue's Add, 212
158 // ErrTaskAlreadyAdded is not considered an error.
159 func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error {
160 if len(tasks) == 0 { 213 if len(tasks) == 0 {
161 return nil 214 return nil
162 } 215 }
163 216
164 // Handle the most common case of one task in a more efficient way. 217 // Handle the most common case of one task in a more efficient way.
165 if len(tasks) == 1 { 218 if len(tasks) == 1 {
166 t, queue, err := d.tqTask(tasks[0]) 219 t, queue, err := d.tqTask(tasks[0])
167 if err != nil { 220 if err != nil {
168 return err 221 return err
169 } 222 }
170 » » if err := taskqueue.Add(c, queue, t); err != nil { 223 » » if err := fn(c, queue, []*taskqueue.Task{t}); err != nil {
171 » » » if err == taskqueue.ErrTaskAlreadyAdded {
172 » » » » return nil
173 » » » }
174 return transient.Tag.Apply(err) 224 return transient.Tag.Apply(err)
175 } 225 }
176 return nil 226 return nil
177 } 227 }
178 228
179 perQueue := map[string][]*taskqueue.Task{} 229 perQueue := map[string][]*taskqueue.Task{}
180 for _, task := range tasks { 230 for _, task := range tasks {
181 t, queue, err := d.tqTask(task) 231 t, queue, err := d.tqTask(task)
182 if err != nil { 232 if err != nil {
183 return err 233 return err
184 } 234 }
185 perQueue[queue] = append(perQueue[queue], t) 235 perQueue[queue] = append(perQueue[queue], t)
186 } 236 }
187 237
188 // Enqueue in parallel, per-queue, split into batches based on Task Queu e 238 // Enqueue in parallel, per-queue, split into batches based on Task Queu e
189 // RPC limits (100 tasks per batch). 239 // RPC limits (100 tasks per batch).
240 const maxBatchSize = 100
190 errs := make(chan error) 241 errs := make(chan error)
191 ops := 0 242 ops := 0
192 for q, tasks := range perQueue { 243 for q, tasks := range perQueue {
193 for len(tasks) > 0 { 244 for len(tasks) > 0 {
194 » » » count := 100 245 » » » count := maxBatchSize
195 if count > len(tasks) { 246 if count > len(tasks) {
196 count = len(tasks) 247 count = len(tasks)
197 } 248 }
198 go func(q string, batch []*taskqueue.Task) { 249 go func(q string, batch []*taskqueue.Task) {
199 » » » » errs <- taskqueue.Add(c, q, batch...) 250 » » » » errs <- fn(c, q, batch)
200 }(q, tasks[:count]) 251 }(q, tasks[:count])
201 tasks = tasks[count:] 252 tasks = tasks[count:]
202 ops++ 253 ops++
203 } 254 }
204 } 255 }
205 256
206 » // Gather all errors throwing away ErrTaskAlreadyAdded. 257 » all := errors.NewLazyMultiError(ops)
207 » var all errors.MultiError
208 for i := 0; i < ops; i++ { 258 for i := 0; i < ops; i++ {
209 err := <-errs 259 err := <-errs
210 » » if merr, yep := err.(errors.MultiError); yep { 260 » » if err != nil {
211 » » » for _, e := range merr { 261 » » » all.Assign(i, err)
212 » » » » if e != nil && e != taskqueue.ErrTaskAlreadyAdde d {
213 » » » » » all = append(all, e)
214 » » » » }
215 » » » }
216 » » } else if err != nil && err != taskqueue.ErrTaskAlreadyAdded {
217 » » » all = append(all, err)
218 } 262 }
219 } 263 }
220 264
221 » if len(all) == 0 { 265 » if err := flattenErrors(all.Get()); err != nil {
266 » » return transient.Tag.Apply(err)
267 » }
268 » return nil
269 }
270
271 // AddTask submits given tasks to an appropriate task queue.
272 //
273 // It means, at some later time in some other GAE process, callbacks registered
274 // as handlers for corresponding proto types will be called.
275 //
276 // If the given context is transactional or namespaced, inherits the
277 // transaction/namespace. Note if running outside of a transaction and multiple
278 // tasks are passed, the operation is not atomic: it returns an error if at
279 // least one enqueue operation failed (there's no way to figure out which one
280 // exactly).
281 //
282 // Returns only transient errors. Unlike regular Task Queue's Add,
283 // ErrTaskAlreadyAdded is not considered an error.
284 func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error {
285 » return d.runBatchesPerQueue(c, tasks, func(c context.Context, queue stri ng, tasks []*taskqueue.Task) error {
286 » » if err := taskqueue.Add(c, queue, tasks...); err != nil {
287 » » » return errors.Filter(err, taskqueue.ErrTaskAlreadyAdded)
288 » » }
222 return nil 289 return nil
223 » } 290 » })
291 }
224 292
225 » return transient.Tag.Apply(all) 293 // DeleteTask deletes the specified tasks from their queues.
294 //
295 // If the given context is transactional or namespaced, inherits the
296 // transaction/namespace. Note if running outside of a transaction and multiple
297 // tasks are passed, the operation is not atomic: it returns an error if at
298 // least one enqueue operation failed (there's no way to figure out which one
299 // exactly).
300 //
301 // Returns only transient errors. Unlike regular Task Queue's Delete,
302 // attempts to delete an unknown or tombstoned task are not considered errors.
303 func (d *Dispatcher) DeleteTask(c context.Context, tasks ...*Task) error {
304 » return d.runBatchesPerQueue(c, tasks, func(c context.Context, queue stri ng, tasks []*taskqueue.Task) error {
305 » » return errors.FilterFunc(taskqueue.Delete(c, queue, tasks...), f unc(err error) bool {
306 » » » // Currently, the best way to detect an attempt to delet e an unknown task
307 » » » // is to check the string with tolerable error message p hrases.
308 » » » for _, phrase := range []string{"UNKNOWN_TASK", "TOMBSTO NED_TASK"} {
309 » » » » if strings.Contains(err.Error(), phrase) {
310 » » » » » return true
311 » » » » }
312 » » » }
313 » » » return false
314 » » })
315 » })
226 } 316 }
227 317
228 // InstallRoutes installs appropriate HTTP routes in the router. 318 // InstallRoutes installs appropriate HTTP routes in the router.
229 // 319 //
230 // Must be called only after all task handlers are registered! 320 // Must be called only after all task handlers are registered!
231 func (d *Dispatcher) InstallRoutes(r *router.Router, mw router.MiddlewareChain) { 321 func (d *Dispatcher) InstallRoutes(r *router.Router, mw router.MiddlewareChain) {
232 queues := stringset.New(0) 322 queues := stringset.New(0)
233 323
234 d.mu.RLock() 324 d.mu.RLock()
235 for _, h := range d.handlers { 325 for _, h := range d.handlers {
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
269 title := handler.typeName 359 title := handler.typeName
270 if task.Title != "" { 360 if task.Title != "" {
271 title = task.Title 361 title = task.Title
272 } 362 }
273 363
274 retryOpts := handler.retryOpts 364 retryOpts := handler.retryOpts
275 if task.RetryOptions != nil { 365 if task.RetryOptions != nil {
276 retryOpts = task.RetryOptions 366 retryOpts = task.RetryOptions
277 } 367 }
278 368
279 // There's some weird restrictions on what characters are allowed inside task
280 // names. Lexicographically close names also cause hot spot problems in the
281 // Task Queues backend. To avoid these two issues, we always use SHA256 hashes
282 // as task names. Also each task kind owns its own namespace of deduplic ation
283 // keys, so add task type to the digest as well.
284 name := ""
285 if task.DeduplicationKey != "" {
286 h := sha256.New()
287 h.Write([]byte(handler.typeName))
288 h.Write([]byte{0})
289 h.Write([]byte(task.DeduplicationKey))
290 name = hex.EncodeToString(h.Sum(nil))
291 }
292
293 return &taskqueue.Task{ 369 return &taskqueue.Task{
294 Path: fmt.Sprintf("%s%s/%s", d.baseURL(), handler.queue, title), 370 Path: fmt.Sprintf("%s%s/%s", d.baseURL(), handler.queue, title),
295 » » Name: name, 371 » » Name: task.Name(),
296 Method: "POST", 372 Method: "POST",
297 Payload: blob, 373 Payload: blob,
298 ETA: task.ETA, 374 ETA: task.ETA,
299 Delay: task.Delay, 375 Delay: task.Delay,
300 RetryOptions: retryOpts, 376 RetryOptions: retryOpts,
301 }, handler.queue, nil 377 }, handler.queue, nil
302 } 378 }
303 379
304 // baseURL returns a URL prefix for all HTTP routes used by Dispatcher. 380 // baseURL returns a URL prefix for all HTTP routes used by Dispatcher.
305 // 381 //
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
407 return nil, fmt.Errorf("no task body given") 483 return nil, fmt.Errorf("no task body given")
408 } 484 }
409 485
410 task := reflect.New(tp.Elem()).Interface().(proto.Message) 486 task := reflect.New(tp.Elem()).Interface().(proto.Message)
411 if err := jsonpb.Unmarshal(bytes.NewReader(*env.Body), task); err != nil { 487 if err := jsonpb.Unmarshal(bytes.NewReader(*env.Body), task); err != nil {
412 return nil, err 488 return nil, err
413 } 489 }
414 490
415 return task, nil 491 return task, nil
416 } 492 }
493
494 ////////////////////////////////////////////////////////////////////////////////
495
496 // flattenErrors collapses a multi-dimensional MultiError space into a flat
497 // MultiError, removing "nil" errors.
498 //
499 // If err is not an errors.MultiError, will return err directly.
500 //
501 // As a special case, if merr contains no non-nil errors, nil will be returned.
502 func flattenErrors(err error) error {
503 var ret errors.MultiError
504 flattenErrorsRec(&ret, err)
505 if len(ret) == 0 {
506 return nil
507 }
508 return ret
509 }
510
511 func flattenErrorsRec(ret *errors.MultiError, err error) {
512 switch et := err.(type) {
513 case nil:
514 case errors.MultiError:
515 for _, e := range et {
516 flattenErrorsRec(ret, e)
517 }
518 default:
519 *ret = append(*ret, et)
520 }
521 }
OLDNEW
« no previous file with comments | « no previous file | appengine/tq/tq_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698