Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(139)

Side by Side Diff: filter/featureBreaker/tq.go

Issue 2512093002: Add support for Pull Queues to prod implementation. (Closed)
Patch Set: use time.Duration for leaseTime Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « filter/count/tq.go ('k') | impl/dummy/dummy.go » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package featureBreaker 5 package featureBreaker
6 6
7 import ( 7 import (
8 "time"
9
8 "golang.org/x/net/context" 10 "golang.org/x/net/context"
9 11
10 tq "github.com/luci/gae/service/taskqueue" 12 tq "github.com/luci/gae/service/taskqueue"
11 ) 13 )
12 14
13 type tqState struct { 15 type tqState struct {
14 *state 16 *state
15 17
16 tq tq.RawInterface 18 tq tq.RawInterface
17 } 19 }
18 20
19 var _ tq.RawInterface = (*tqState)(nil) 21 var _ tq.RawInterface = (*tqState)(nil)
20 22
21 func (t *tqState) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) error { 23 func (t *tqState) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) error {
22 return t.run(func() (err error) { return t.tq.AddMulti(tasks, queueName, cb) }) 24 return t.run(func() (err error) { return t.tq.AddMulti(tasks, queueName, cb) })
23 } 25 }
24 26
25 func (t *tqState) DeleteMulti(tasks []*tq.Task, queueName string, cb tq.RawCB) e rror { 27 func (t *tqState) DeleteMulti(tasks []*tq.Task, queueName string, cb tq.RawCB) e rror {
26 return t.run(func() error { return t.tq.DeleteMulti(tasks, queueName, cb ) }) 28 return t.run(func() error { return t.tq.DeleteMulti(tasks, queueName, cb ) })
27 } 29 }
28 30
31 func (t *tqState) Lease(maxTasks int, queueName string, leaseTime time.Duration) (tasks []*tq.Task, err error) {
32 err = t.run(func() (err error) {
33 tasks, err = t.tq.Lease(maxTasks, queueName, leaseTime)
34 return
35 })
36 if err != nil {
37 tasks = nil
38 }
39 return
40 }
41
42 func (t *tqState) LeaseByTag(maxTasks int, queueName string, leaseTime time.Dura tion, tag string) (tasks []*tq.Task, err error) {
43 err = t.run(func() (err error) {
44 tasks, err = t.tq.LeaseByTag(maxTasks, queueName, leaseTime, tag )
45 return
46 })
47 if err != nil {
48 tasks = nil
49 }
50 return
51 }
52
53 func (t *tqState) ModifyLease(task *tq.Task, queueName string, leaseTime time.Du ration) error {
54 return t.run(func() error { return t.tq.ModifyLease(task, queueName, lea seTime) })
55 }
56
29 func (t *tqState) Purge(queueName string) error { 57 func (t *tqState) Purge(queueName string) error {
30 return t.run(func() error { return t.tq.Purge(queueName) }) 58 return t.run(func() error { return t.tq.Purge(queueName) })
31 } 59 }
32 60
33 func (t *tqState) Stats(queueNames []string, cb tq.RawStatsCB) error { 61 func (t *tqState) Stats(queueNames []string, cb tq.RawStatsCB) error {
34 return t.run(func() error { return t.tq.Stats(queueNames, cb) }) 62 return t.run(func() error { return t.tq.Stats(queueNames, cb) })
35 } 63 }
36 64
37 func (t *tqState) GetTestable() tq.Testable { 65 func (t *tqState) GetTestable() tq.Testable {
38 return t.tq.GetTestable() 66 return t.tq.GetTestable()
39 } 67 }
40 68
41 // FilterTQ installs a featureBreaker TaskQueue filter in the context. 69 // FilterTQ installs a featureBreaker TaskQueue filter in the context.
42 func FilterTQ(c context.Context, defaultError error) (context.Context, FeatureBr eaker) { 70 func FilterTQ(c context.Context, defaultError error) (context.Context, FeatureBr eaker) {
43 state := newState(defaultError) 71 state := newState(defaultError)
44 return tq.AddRawFilters(c, func(ic context.Context, tq tq.RawInterface) tq.RawInterface { 72 return tq.AddRawFilters(c, func(ic context.Context, tq tq.RawInterface) tq.RawInterface {
45 return &tqState{state, tq} 73 return &tqState{state, tq}
46 }), state 74 }), state
47 } 75 }
OLDNEW
« no previous file with comments | « filter/count/tq.go ('k') | impl/dummy/dummy.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698