Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(94)

Side by Side Diff: appengine/tq/tq.go

Issue 2986373002: [tq] Enable task deletion. (Closed)
Patch Set: reorder fields Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | appengine/tq/tq_test.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2017 The LUCI Authors. 1 // Copyright 2017 The LUCI Authors.
2 // 2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License. 4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at 5 // You may obtain a copy of the License at
6 // 6 //
7 // http://www.apache.org/licenses/LICENSE-2.0 7 // http://www.apache.org/licenses/LICENSE-2.0
8 // 8 //
9 // Unless required by applicable law or agreed to in writing, software 9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, 10 // distributed under the License is distributed on an "AS IS" BASIS,
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after
52 } 52 }
53 53
54 // Task contains task body and additional parameters that influence how it is 54 // Task contains task body and additional parameters that influence how it is
55 // routed. 55 // routed.
56 type Task struct { 56 type Task struct {
57 // Payload is task's payload as well as indicator of its type. 57 // Payload is task's payload as well as indicator of its type.
58 // 58 //
59 // Tasks are routed based on type of the payload message, see RegisterTa sk. 59 // Tasks are routed based on type of the payload message, see RegisterTa sk.
60 Payload proto.Message 60 Payload proto.Message
61 61
62 // NamePrefix, if not empty, is a string that will be prefixed to the ta sk's
63 // name. Characters in NamePrefix must be appropriate task queue name
64 // characters. NamePrefix can be useful because the Task Queue system al lows
65 // users to search for tasks by prefix.
66 //
67 // Lexicographically close names can cause hot spots in the Task Queues
68 // backend. If NamePrefix is specified, users should try and ensure that
69 // it is friendly to sharding (e.g., begins with a hash string).
70 //
71 // Setting NamePrefix and/or DeduplicationKey will result in a named tas k
72 // being generated. This task can be cancelled using DeleteTask.
73 NamePrefix string
74
62 // DeduplicationKey is optional unique key of the task. 75 // DeduplicationKey is optional unique key of the task.
63 // 76 //
64 // If a task of a given proto type with a given key has already been enq ueued 77 // If a task of a given proto type with a given key has already been enq ueued
65 // recently, this task will be silently ignored. 78 // recently, this task will be silently ignored.
66 // 79 //
67 // Such tasks can only be used outside of transactions. 80 // Such tasks can only be used outside of transactions.
81 //
82 // Setting NamePrefix and/or DeduplicationKey will result in a named tas k
83 // being generated. This task can be cancelled using DeleteTask.
68 DeduplicationKey string 84 DeduplicationKey string
69 85
70 // Title is optional string that identifies the task in HTTP logs. 86 // Title is optional string that identifies the task in HTTP logs.
71 // 87 //
72 // It will show up as a suffix in task handler URL. It exists exclusivel y to 88 // It will show up as a suffix in task handler URL. It exists exclusivel y to
73 // simplify reading HTTP logs. It serves no other purpose! In particular , 89 // simplify reading HTTP logs. It serves no other purpose! In particular ,
74 // it is NOT a task name. 90 // it is NOT a task name.
75 // 91 //
76 // Handlers won't ever see it. Pass all information through the task bod y. 92 // Handlers won't ever see it. Pass all information through the task bod y.
77 Title string 93 Title string
78 94
79 // Delay specifies the duration the task queue service must wait before 95 // Delay specifies the duration the task queue service must wait before
80 // executing the task. 96 // executing the task.
81 // 97 //
82 // Either Delay or ETA may be set, but not both. 98 // Either Delay or ETA may be set, but not both.
83 Delay time.Duration 99 Delay time.Duration
84 100
85 // ETA specifies the earliest time a task may be executed. 101 // ETA specifies the earliest time a task may be executed.
86 // 102 //
87 // Either Delay or ETA may be set, but not both. 103 // Either Delay or ETA may be set, but not both.
88 ETA time.Time 104 ETA time.Time
89 105
90 // Retry options for this task. 106 // Retry options for this task.
91 // 107 //
92 // If given, overrides default options set when this task was registered . 108 // If given, overrides default options set when this task was registered .
93 RetryOptions *taskqueue.RetryOptions 109 RetryOptions *taskqueue.RetryOptions
94 } 110 }
95 111
112 // Name generates and returns the task's name.
113 //
114 // If the task is not a named task (doesn't have NamePrefix or DeduplicationKey
115 // set), this will return an empty string.
116 func (task *Task) Name() string {
117 if task.NamePrefix == "" && task.DeduplicationKey == "" {
118 return ""
119 }
120
121 parts := make([]string, 0, 2)
122
123 if task.NamePrefix != "" {
124 parts = append(parts, task.NamePrefix)
125 }
126
127 // There's some weird restrictions on what characters are allowed inside task
128 // names. Lexicographically close names also cause hot spot problems in the
129 // Task Queues backend. To avoid these two issues, we always use SHA256 hashes
130 // as task names. Also each task kind owns its own namespace of deduplic ation
131 // keys, so add task type to the digest as well.
132 if task.DeduplicationKey != "" {
133 h := sha256.New()
134 if task.Payload != nil {
Vadim Sh. 2017/08/03 20:12:41 tasks without payload are meaningless in this mode
dnj 2017/08/03 20:56:27 No, I suppose a panic is appropriate.
135 h.Write([]byte(proto.MessageName(task.Payload)))
136 }
137 h.Write([]byte{0})
138 h.Write([]byte(task.DeduplicationKey))
139 parts = append(parts, hex.EncodeToString(h.Sum(nil)))
140 }
141
142 return strings.Join(parts, "")
Vadim Sh. 2017/08/03 20:12:41 nit: let's use '-' as separator (if it's allowed i
dnj 2017/08/03 20:56:27 It is allowed. OK.
143 }
144
96 // Handler is called to handle one enqueued task. 145 // Handler is called to handle one enqueued task.
97 // 146 //
98 // The passed context is produced by a middleware chain installed with 147 // The passed context is produced by a middleware chain installed with
99 // InstallHandlers. 148 // InstallHandlers.
100 // 149 //
101 // execCount corresponds to X-AppEngine-TaskExecutionCount header value: it is 150 // execCount corresponds to X-AppEngine-TaskExecutionCount header value: it is
102 // 1 on first execution attempt, 2 on a retry, and so on. 151 // 1 on first execution attempt, 2 on a retry, and so on.
103 // 152 //
104 // May return transient errors. In this case, task queue may attempt to 153 // May return transient errors. In this case, task queue may attempt to
105 // redeliver the task (depending on RetryOptions). 154 // redeliver the task (depending on RetryOptions).
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
137 } 186 }
138 187
139 d.handlers[name] = handler{ 188 d.handlers[name] = handler{
140 cb: cb, 189 cb: cb,
141 typeName: name, 190 typeName: name,
142 queue: queue, 191 queue: queue,
143 retryOpts: opts, 192 retryOpts: opts,
144 } 193 }
145 } 194 }
146 195
147 // AddTask submits given tasks to an appropriate task queue. 196 // runBatchesPerQueue is a generic parallel task distributor. It solves the
197 // problems that:
198 //» - "tasks" may be assigned to different queues, and tasks assigned to the
199 // same queue should be batched together.
200 //» - Any given batch may exceed queue operation limits, and thus needs to b e
201 //» broken into multiple operations on sub-batches.
148 // 202 //
149 // It means, at some later time in some other GAE process, callbacks registered 203 // fn is called for each sub-batch assigned to each queue. All resulting errors
150 // as handlers for corresponding proto types will be called. 204 // are then flattened. If no fn invocation returns any errors, nil will be
151 // 205 // returned. If a single error is returned, this function will return that
152 // If the given context is transactional, inherits the transaction. Note if 206 // error. If an errors.MultiError is returned, it and any embedded
153 // running outside of a transaction and multiple tasks are passed, the operation 207 // MultiError (recursively) will be flattened into a single MultiError
154 // is not atomic: it returns an error if at least one enqueue operation failed 208 // containing only the non-nil errors. This simplifies user expectations.
Vadim Sh. 2017/08/03 20:12:41 I did it this way because I had never seen we actu
dnj 2017/08/03 20:56:27 I think it's fine. If a user needs something more,
Vadim Sh. 2017/08/03 21:04:05 I'm just explaining why the code wasn't rigorous h
155 // (there's no way to figure out which one exactly). 209 func (d *Dispatcher) runBatchesPerQueue(c context.Context, tasks []*Task,
156 // 210 » fn func(c context.Context, queue string, tasks []*taskqueue.Task) error) error {
157 // Returns only transient errors. Unlike regular Task Queue's Add, 211
158 // ErrTaskAlreadyAdded is not considered an error.
159 func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error {
160 if len(tasks) == 0 { 212 if len(tasks) == 0 {
161 return nil 213 return nil
162 } 214 }
163 215
164 // Handle the most common case of one task in a more efficient way. 216 // Handle the most common case of one task in a more efficient way.
165 if len(tasks) == 1 { 217 if len(tasks) == 1 {
166 t, queue, err := d.tqTask(tasks[0]) 218 t, queue, err := d.tqTask(tasks[0])
167 if err != nil { 219 if err != nil {
168 return err 220 return err
169 } 221 }
170 » » if err := taskqueue.Add(c, queue, t); err != nil { 222 » » if err := fn(c, queue, []*taskqueue.Task{t}); err != nil {
171 » » » if err == taskqueue.ErrTaskAlreadyAdded {
172 » » » » return nil
173 » » » }
174 return transient.Tag.Apply(err) 223 return transient.Tag.Apply(err)
175 } 224 }
176 return nil 225 return nil
177 } 226 }
178 227
179 perQueue := map[string][]*taskqueue.Task{} 228 perQueue := map[string][]*taskqueue.Task{}
180 for _, task := range tasks { 229 for _, task := range tasks {
181 t, queue, err := d.tqTask(task) 230 t, queue, err := d.tqTask(task)
182 if err != nil { 231 if err != nil {
183 return err 232 return err
184 } 233 }
185 perQueue[queue] = append(perQueue[queue], t) 234 perQueue[queue] = append(perQueue[queue], t)
186 } 235 }
187 236
188 // Enqueue in parallel, per-queue, split into batches based on Task Queu e 237 // Enqueue in parallel, per-queue, split into batches based on Task Queu e
189 // RPC limits (100 tasks per batch). 238 // RPC limits (100 tasks per batch).
239 const maxBatchSize = 100
190 errs := make(chan error) 240 errs := make(chan error)
191 ops := 0 241 ops := 0
192 for q, tasks := range perQueue { 242 for q, tasks := range perQueue {
193 for len(tasks) > 0 { 243 for len(tasks) > 0 {
194 » » » count := 100 244 » » » count := maxBatchSize
195 if count > len(tasks) { 245 if count > len(tasks) {
196 count = len(tasks) 246 count = len(tasks)
197 } 247 }
198 go func(q string, batch []*taskqueue.Task) { 248 go func(q string, batch []*taskqueue.Task) {
199 » » » » errs <- taskqueue.Add(c, q, batch...) 249 » » » » errs <- fn(c, q, batch)
200 }(q, tasks[:count]) 250 }(q, tasks[:count])
201 tasks = tasks[count:] 251 tasks = tasks[count:]
202 ops++ 252 ops++
203 } 253 }
204 } 254 }
205 255
206 » // Gather all errors throwing away ErrTaskAlreadyAdded. 256 » all := errors.NewLazyMultiError(ops)
207 » var all errors.MultiError
208 for i := 0; i < ops; i++ { 257 for i := 0; i < ops; i++ {
209 err := <-errs 258 err := <-errs
210 » » if merr, yep := err.(errors.MultiError); yep { 259 » » if err != nil {
211 » » » for _, e := range merr { 260 » » » all.Assign(i, err)
212 » » » » if e != nil && e != taskqueue.ErrTaskAlreadyAdde d {
213 » » » » » all = append(all, e)
214 » » » » }
215 » » » }
216 » » } else if err != nil && err != taskqueue.ErrTaskAlreadyAdded {
217 » » » all = append(all, err)
218 } 261 }
219 } 262 }
220 263
221 » if len(all) == 0 { 264 » if err := flattenErrors(all.Get()); err != nil {
265 » » return transient.Tag.Apply(err)
266 » }
267 » return nil
268 }
269
270 // AddTask submits given tasks to an appropriate task queue.
271 //
272 // It means, at some later time in some other GAE process, callbacks registered
273 // as handlers for corresponding proto types will be called.
274 //
275 // If the given context is transactional or namespaced, inherits the
276 // transaction/namespace. Note if running outside of a transaction and multiple
277 // tasks are passed, the operation is not atomic: it returns an error if at
278 // least one enqueue operation failed (there's no way to figure out which one
279 // exactly).
280 //
281 // Returns only transient errors. Unlike regular Task Queue's Add,
282 // ErrTaskAlreadyAdded is not considered an error.
283 func (d *Dispatcher) AddTask(c context.Context, tasks ...*Task) error {
284 » return d.runBatchesPerQueue(c, tasks, func(c context.Context, queue stri ng, tasks []*taskqueue.Task) error {
285 » » if err := taskqueue.Add(c, queue, tasks...); err != nil {
286 » » » return errors.Filter(err, taskqueue.ErrTaskAlreadyAdded)
Vadim Sh. 2017/08/03 20:12:41 does this handle the case when err is taskqueue.Er
dnj 2017/08/03 20:56:27 yep
287 » » }
222 return nil 288 return nil
223 » } 289 » })
290 }
224 291
225 » return transient.Tag.Apply(all) 292 // DeleteTask deletes the specified tasks from their queues.
293 //
294 // If the given context is transactional or namespaced, inherits the
295 // transaction/namespace. Note if running outside of a transaction and multiple
296 // tasks are passed, the operation is not atomic: it returns an error if at
297 // least one enqueue operation failed (there's no way to figure out which one
298 // exactly).
299 //
300 // Returns only transient errors. Unlike regular Task Queue's Delete,
301 // attempts to delete an unknown or tombstoned task are not considered errors.
302 func (d *Dispatcher) DeleteTask(c context.Context, tasks ...*Task) error {
303 » return d.runBatchesPerQueue(c, tasks, func(c context.Context, queue stri ng, tasks []*taskqueue.Task) error {
304 » » return errors.FilterFunc(taskqueue.Delete(c, queue, tasks...), f unc(err error) bool {
305 » » » // Currently, the best way to detect an attempt to delet e an unknown task
306 » » » // is to check the string with tolerable error message p hrases.
307 » » » for _, phrase := range []string{"UNKNOWN_TASK", "TOMBSTO NED_TASK"} {
308 » » » » if strings.Contains(err.Error(), phrase) {
309 » » » » » return true
310 » » » » }
311 » » » }
312 » » » return false
313 » » })
314 » })
226 } 315 }
227 316
228 // InstallRoutes installs appropriate HTTP routes in the router. 317 // InstallRoutes installs appropriate HTTP routes in the router.
229 // 318 //
230 // Must be called only after all task handlers are registered! 319 // Must be called only after all task handlers are registered!
231 func (d *Dispatcher) InstallRoutes(r *router.Router, mw router.MiddlewareChain) { 320 func (d *Dispatcher) InstallRoutes(r *router.Router, mw router.MiddlewareChain) {
232 queues := stringset.New(0) 321 queues := stringset.New(0)
233 322
234 d.mu.RLock() 323 d.mu.RLock()
235 for _, h := range d.handlers { 324 for _, h := range d.handlers {
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
269 title := handler.typeName 358 title := handler.typeName
270 if task.Title != "" { 359 if task.Title != "" {
271 title = task.Title 360 title = task.Title
272 } 361 }
273 362
274 retryOpts := handler.retryOpts 363 retryOpts := handler.retryOpts
275 if task.RetryOptions != nil { 364 if task.RetryOptions != nil {
276 retryOpts = task.RetryOptions 365 retryOpts = task.RetryOptions
277 } 366 }
278 367
279 // There's some weird restrictions on what characters are allowed inside task
280 // names. Lexicographically close names also cause hot spot problems in the
281 // Task Queues backend. To avoid these two issues, we always use SHA256 hashes
282 // as task names. Also each task kind owns its own namespace of deduplic ation
283 // keys, so add task type to the digest as well.
284 name := ""
285 if task.DeduplicationKey != "" {
286 h := sha256.New()
287 h.Write([]byte(handler.typeName))
288 h.Write([]byte{0})
289 h.Write([]byte(task.DeduplicationKey))
290 name = hex.EncodeToString(h.Sum(nil))
291 }
292
293 return &taskqueue.Task{ 368 return &taskqueue.Task{
294 Path: fmt.Sprintf("%s%s/%s", d.baseURL(), handler.queue, title), 369 Path: fmt.Sprintf("%s%s/%s", d.baseURL(), handler.queue, title),
295 » » Name: name, 370 » » Name: task.Name(),
296 Method: "POST", 371 Method: "POST",
297 Payload: blob, 372 Payload: blob,
298 ETA: task.ETA, 373 ETA: task.ETA,
299 Delay: task.Delay, 374 Delay: task.Delay,
300 RetryOptions: retryOpts, 375 RetryOptions: retryOpts,
301 }, handler.queue, nil 376 }, handler.queue, nil
302 } 377 }
303 378
304 // baseURL returns a URL prefix for all HTTP routes used by Dispatcher. 379 // baseURL returns a URL prefix for all HTTP routes used by Dispatcher.
305 // 380 //
(...skipping 101 matching lines...) Expand 10 before | Expand all | Expand 10 after
407 return nil, fmt.Errorf("no task body given") 482 return nil, fmt.Errorf("no task body given")
408 } 483 }
409 484
410 task := reflect.New(tp.Elem()).Interface().(proto.Message) 485 task := reflect.New(tp.Elem()).Interface().(proto.Message)
411 if err := jsonpb.Unmarshal(bytes.NewReader(*env.Body), task); err != nil { 486 if err := jsonpb.Unmarshal(bytes.NewReader(*env.Body), task); err != nil {
412 return nil, err 487 return nil, err
413 } 488 }
414 489
415 return task, nil 490 return task, nil
416 } 491 }
492
493 ////////////////////////////////////////////////////////////////////////////////
494
495 // flattenErrors collapses a multi-dimensional MultiError space into a flat
496 // MultiError, removing "nil" errors.
497 //
498 // If err is not an errors.MultiError, will return err directly.
499 //
500 // As a special case, if merr contains no non-nil errors, nil will be returned.
501 func flattenErrors(err error) error {
502 var ret errors.MultiError
503 flattenErrorsRec(&ret, err)
504 if len(ret) == 0 {
505 return nil
506 }
507 return ret
508 }
509
510 func flattenErrorsRec(ret *errors.MultiError, err error) {
511 switch et := err.(type) {
512 case nil:
513 case errors.MultiError:
514 for _, e := range et {
515 flattenErrorsRec(ret, e)
516 }
517 default:
518 *ret = append(*ret, et)
519 }
520 }
OLDNEW
« no previous file with comments | « no previous file | appengine/tq/tq_test.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698