OLD | NEW |
1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
4 | 4 |
5 package taskqueue | 5 package taskqueue |
6 | 6 |
7 import ( | 7 import ( |
| 8 "time" |
| 9 |
8 "github.com/luci/luci-go/common/errors" | 10 "github.com/luci/luci-go/common/errors" |
9 | 11 |
10 "golang.org/x/net/context" | 12 "golang.org/x/net/context" |
11 ) | 13 ) |
12 | 14 |
13 // Add adds the specified task(s) to the specified task queue. | 15 // Add adds the specified task(s) to the specified task queue. |
14 // | 16 // |
15 // If only one task is provided its error will be returned directly. If more | 17 // If only one task is provided its error will be returned directly. If more |
16 // than one task is provided, an errors.MultiError will be returned in the | 18 // than one task is provided, an errors.MultiError will be returned in the |
17 // event of an error, with a given error index corresponding to the error | 19 // event of an error, with a given error index corresponding to the error |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
49 }) | 51 }) |
50 if err == nil { | 52 if err == nil { |
51 err = lme.Get() | 53 err = lme.Get() |
52 if len(tasks) == 1 { | 54 if len(tasks) == 1 { |
53 err = errors.SingleError(err) | 55 err = errors.SingleError(err) |
54 } | 56 } |
55 } | 57 } |
56 return err | 58 return err |
57 } | 59 } |
58 | 60 |
59 // NOTE(riannucci): No support for pull taskqueues. We're not planning on | 61 // NOTE(riannucci): Pull task queues API can be extended to support automatic |
60 // making pull-queue clients which RUN in appengine (e.g. they'd all be | 62 // lease management. |
61 // external REST consumers). If someone needs this, it will need to be added | 63 // |
62 // here and in RawInterface. The theory is that a good lease API might look | 64 // The theory is that a good lease API might look like: |
63 // like: | |
64 // | 65 // |
65 // func Lease(queueName, tag string, batchSize int, duration time.Time, cb fun
c(*Task, error<-)) | 66 // func Lease(queueName, tag string, batchSize int, duration time.Time, cb fun
c(*Task, error<-)) |
66 // | 67 // |
67 // Which blocks and calls cb for each task obtained. Lease would then do all | 68 // Which blocks and calls cb for each task obtained. Lease would then do all |
68 // necessary backoff negotiation with the backend. The callback could execute | 69 // necessary backoff negotiation with the backend. The callback could execute |
69 // synchronously (stuffing an error into the chan or panicing if it fails), or | 70 // synchronously (stuffing an error into the chan or panicing if it fails), or |
70 // asynchronously (dispatching a goroutine which will then populate the error | 71 // asynchronously (dispatching a goroutine which will then populate the error |
71 // channel if needed). If it operates asynchronously, it has the option of | 72 // channel if needed). If it operates asynchronously, it has the option of |
72 // processing multiple work items at a time. | 73 // processing multiple work items at a time. |
73 // | 74 // |
74 // Lease would also take care of calling ModifyLease as necessary to ensure | 75 // Lease would also take care of calling ModifyLease as necessary to ensure |
75 // that each call to cb would have 'duration' amount of time to work on the | 76 // that each call to cb would have 'duration' amount of time to work on the |
76 // task, as well as releasing as many leased tasks as it can on a failure. | 77 // task, as well as releasing as many leased tasks as it can on a failure. |
77 | 78 |
| 79 // Lease leases tasks from a queue. |
| 80 // |
| 81 // leaseTime has seconds precision. The number of tasks fetched will be at most |
| 82 // maxTasks. |
| 83 func Lease(c context.Context, maxTasks int, queueName string, leaseTime time.Dur
ation) ([]*Task, error) { |
| 84 return Raw(c).Lease(maxTasks, queueName, leaseTime) |
| 85 } |
| 86 |
| 87 // LeaseByTag leases tasks from a queue, grouped by tag. |
| 88 // |
| 89 // If tag is empty, then the returned tasks are grouped by the tag of the task |
| 90 // with earliest ETA. |
| 91 // |
| 92 // leaseTime has seconds precision. The number of tasks fetched will be at most |
| 93 // maxTasks. |
| 94 func LeaseByTag(c context.Context, maxTasks int, queueName string, leaseTime tim
e.Duration, tag string) ([]*Task, error) { |
| 95 return Raw(c).LeaseByTag(maxTasks, queueName, leaseTime, tag) |
| 96 } |
| 97 |
| 98 // ModifyLease modifies the lease of a task. |
| 99 // |
| 100 // Used to request more processing time, or to abandon processing. leaseTime has |
| 101 // seconds precision and must not be negative. |
| 102 func ModifyLease(c context.Context, task *Task, queueName string, leaseTime time
.Duration) error { |
| 103 return Raw(c).ModifyLease(task, queueName, leaseTime) |
| 104 } |
| 105 |
78 // Purge purges all tasks form the named queue. | 106 // Purge purges all tasks form the named queue. |
79 func Purge(c context.Context, queueName string) error { | 107 func Purge(c context.Context, queueName string) error { |
80 return Raw(c).Purge(queueName) | 108 return Raw(c).Purge(queueName) |
81 } | 109 } |
82 | 110 |
83 // Stats returns Statistics instances for each of the named task queues. | 111 // Stats returns Statistics instances for each of the named task queues. |
84 // | 112 // |
85 // If only one task is provided its error will be returned directly. If more | 113 // If only one task is provided its error will be returned directly. If more |
86 // than one task is provided, an errors.MultiError will be returned in the | 114 // than one task is provided, an errors.MultiError will be returned in the |
87 // event of an error, with a given error index corresponding to the error | 115 // event of an error, with a given error index corresponding to the error |
(...skipping 15 matching lines...) Expand all Loading... |
103 } | 131 } |
104 } | 132 } |
105 return ret, err | 133 return ret, err |
106 } | 134 } |
107 | 135 |
108 // GetTestable returns a Testable for the current task queue service in c, or | 136 // GetTestable returns a Testable for the current task queue service in c, or |
109 // nil if it does not offer one. | 137 // nil if it does not offer one. |
110 func GetTestable(c context.Context) Testable { | 138 func GetTestable(c context.Context) Testable { |
111 return Raw(c).GetTestable() | 139 return Raw(c).GetTestable() |
112 } | 140 } |
OLD | NEW |