OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package taskqueue | 5 package taskqueue |
6 | 6 |
7 // Interface is the full interface to the Task Queue service. | 7 import ( |
8 type Interface interface { | 8 » "github.com/luci/luci-go/common/errors" |
9 » Add(task *Task, queueName string) error | |
10 » Delete(task *Task, queueName string) error | |
11 | 9 |
12 » AddMulti(tasks []*Task, queueName string) error | 10 » "golang.org/x/net/context" |
13 » DeleteMulti(tasks []*Task, queueName string) error | 11 ) |
14 | 12 |
15 » // NOTE(riannucci): No support for pull taskqueues. We're not planning o
n | 13 // Add adds the specified task(s) to the specified task queue. |
16 » // making pull-queue clients which RUN in appengine (e.g. they'd all be | 14 // |
17 » // external REST consumers). If someone needs this, it will need to be a
dded | 15 // If only one task is provided its error will be returned directly. If more |
18 » // here and in RawInterface. The theory is that a good lease API might l
ook | 16 // than one task is provided, an errors.MultiError will be returned in the |
19 » // like: | 17 // event of an error, with a given error index corresponding to the error |
20 » // | 18 // encountered when processing the task at that index. |
21 » // func Lease(queueName, tag string, batchSize int, duration time.Time
, cb func(*Task, error<-)) | 19 func Add(c context.Context, queueName string, tasks ...*Task) error { |
22 » // | 20 » lme := errors.NewLazyMultiError(len(tasks)) |
23 » // Which blocks and calls cb for each task obtained. Lease would then do
all | 21 » i := 0 |
24 » // necessary backoff negotiation with the backend. The callback could ex
ecute | 22 » err := Raw(c).AddMulti(tasks, queueName, func(t *Task, err error) { |
25 » // synchronously (stuffing an error into the chan or panicing if it fail
s), or | 23 » » if !lme.Assign(i, err) { |
26 » // asynchronously (dispatching a goroutine which will then populate the
error | 24 » » » *tasks[i] = *t |
27 » // channel if needed). If it operates asynchronously, it has the option
of | 25 » » } |
28 » // processing multiple work items at a time. | 26 » » i++ |
29 » // | 27 » }) |
30 » // Lease would also take care of calling ModifyLease as necessary to ens
ure | 28 » if err == nil { |
31 » // that each call to cb would have 'duration' amount of time to work on
the | 29 » » err = lme.Get() |
32 » // task, as well as releasing as many leased tasks as it can on a failur
e. | 30 » » if len(tasks) == 1 { |
| 31 » » » err = errors.SingleError(err) |
| 32 » » } |
| 33 » } |
| 34 » return err |
| 35 } |
33 | 36 |
34 » Purge(queueName string) error | 37 // Delete deletes a task from the task queue. |
| 38 // |
| 39 // If only one task is provided its error will be returned directly. If more |
| 40 // than one task is provided, an errors.MultiError will be returned in the |
| 41 // event of an error, with a given error index corresponding to the error |
| 42 // encountered when processing the task at that index. |
| 43 func Delete(c context.Context, queueName string, tasks ...*Task) error { |
| 44 » lme := errors.NewLazyMultiError(len(tasks)) |
| 45 » i := 0 |
| 46 » err := Raw(c).DeleteMulti(tasks, queueName, func(err error) { |
| 47 » » lme.Assign(i, err) |
| 48 » » i++ |
| 49 » }) |
| 50 » if err == nil { |
| 51 » » err = lme.Get() |
| 52 » » if len(tasks) == 1 { |
| 53 » » » err = errors.SingleError(err) |
| 54 » » } |
| 55 » } |
| 56 » return err |
| 57 } |
35 | 58 |
36 » Stats(queueNames ...string) ([]Statistics, error) | 59 // NOTE(riannucci): No support for pull taskqueues. We're not planning on |
| 60 // making pull-queue clients which RUN in appengine (e.g. they'd all be |
| 61 // external REST consumers). If someone needs this, it will need to be added |
| 62 // here and in RawInterface. The theory is that a good lease API might look |
| 63 // like: |
| 64 // |
| 65 // func Lease(queueName, tag string, batchSize int, duration time.Time, cb fun
c(*Task, error<-)) |
| 66 // |
| 67 // Which blocks and calls cb for each task obtained. Lease would then do all |
| 68 // necessary backoff negotiation with the backend. The callback could execute |
| 69 // synchronously (stuffing an error into the chan or panicing if it fails), or |
| 70 // asynchronously (dispatching a goroutine which will then populate the error |
| 71 // channel if needed). If it operates asynchronously, it has the option of |
| 72 // processing multiple work items at a time. |
| 73 // |
| 74 // Lease would also take care of calling ModifyLease as necessary to ensure |
| 75 // that each call to cb would have 'duration' amount of time to work on the |
| 76 // task, as well as releasing as many leased tasks as it can on a failure. |
37 | 77 |
38 » Testable() Testable | 78 // Purge purges all tasks form the named queue. |
| 79 func Purge(c context.Context, queueName string) error { |
| 80 » return Raw(c).Purge(queueName) |
| 81 } |
39 | 82 |
40 » Raw() RawInterface | 83 // Stats returns Statistics instances for each of the named task queues. |
| 84 // |
| 85 // If only one task is provided its error will be returned directly. If more |
| 86 // than one task is provided, an errors.MultiError will be returned in the |
| 87 // event of an error, with a given error index corresponding to the error |
| 88 // encountered when processing the task at that index. |
| 89 func Stats(c context.Context, queueNames ...string) ([]Statistics, error) { |
| 90 » ret := make([]Statistics, len(queueNames)) |
| 91 » lme := errors.NewLazyMultiError(len(queueNames)) |
| 92 » i := 0 |
| 93 » err := Raw(c).Stats(queueNames, func(s *Statistics, err error) { |
| 94 » » if !lme.Assign(i, err) { |
| 95 » » » ret[i] = *s |
| 96 » » } |
| 97 » » i++ |
| 98 » }) |
| 99 » if err == nil { |
| 100 » » err = lme.Get() |
| 101 » » if len(queueNames) == 1 { |
| 102 » » » err = errors.SingleError(err) |
| 103 » » } |
| 104 » } |
| 105 » return ret, err |
41 } | 106 } |
| 107 |
| 108 // GetTestable returns a Testable for the current task queue service in c, or |
| 109 // nil if it does not offer one. |
| 110 func GetTestable(c context.Context) Testable { |
| 111 return Raw(c).GetTestable() |
| 112 } |
OLD | NEW |