OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package featureBreaker | |
6 | |
7 import ( | |
8 "golang.org/x/net/context" | |
9 | |
10 tq "github.com/luci/gae/service/taskqueue" | |
11 ) | |
12 | |
13 type tqState struct { | |
14 *state | |
15 | |
16 tq tq.Interface | |
17 } | |
18 | |
19 var _ tq.Interface = (*tqState)(nil) | |
20 | |
21 func (t *tqState) Add(task *tq.Task, queueName string) (ret *tq.Task, err error)
{ | |
22 err = t.run(func() (err error) { | |
23 ret, err = t.tq.Add(task, queueName) | |
24 return | |
25 }) | |
26 return | |
27 } | |
28 | |
29 func (t *tqState) Delete(task *tq.Task, queueName string) error { | |
30 return t.run(func() error { | |
31 return t.tq.Delete(task, queueName) | |
32 }) | |
33 } | |
34 | |
35 func (t *tqState) AddMulti(tasks []*tq.Task, queueName string) (ret []*tq.Task,
err error) { | |
36 err = t.run(func() (err error) { | |
37 ret, err = t.tq.AddMulti(tasks, queueName) | |
38 return | |
39 }) | |
40 return | |
41 } | |
42 | |
43 func (t *tqState) DeleteMulti(tasks []*tq.Task, queueName string) error { | |
44 return t.run(func() error { | |
45 return t.tq.DeleteMulti(tasks, queueName) | |
46 }) | |
47 } | |
48 | |
49 func (t *tqState) Lease(maxTasks int, queueName string, leaseTime int) (ret []*t
q.Task, err error) { | |
50 err = t.run(func() (err error) { | |
51 ret, err = t.tq.Lease(maxTasks, queueName, leaseTime) | |
52 return | |
53 }) | |
54 return | |
55 } | |
56 | |
57 func (t *tqState) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag
string) (ret []*tq.Task, err error) { | |
58 err = t.run(func() (err error) { | |
59 ret, err = t.tq.LeaseByTag(maxTasks, queueName, leaseTime, tag) | |
60 return | |
61 }) | |
62 return | |
63 } | |
64 | |
65 func (t *tqState) ModifyLease(task *tq.Task, queueName string, leaseTime int) er
ror { | |
66 return t.run(func() error { | |
67 return t.tq.ModifyLease(task, queueName, leaseTime) | |
68 }) | |
69 } | |
70 | |
71 func (t *tqState) Purge(queueName string) error { | |
72 return t.run(func() error { | |
73 return t.tq.Purge(queueName) | |
74 }) | |
75 } | |
76 | |
77 func (t *tqState) QueueStats(queueNames []string) (ret []tq.Statistics, err erro
r) { | |
78 err = t.run(func() (err error) { | |
79 ret, err = t.tq.QueueStats(queueNames) | |
80 return | |
81 }) | |
82 return | |
83 } | |
84 | |
85 // FilterTQ installs a counter TaskQueue filter in the context. | |
86 func FilterTQ(c context.Context, defaultError error) (context.Context, FeatureBr
eaker) { | |
87 state := newState(defaultError) | |
88 return tq.AddFilters(c, func(ic context.Context, tq tq.Interface) tq.Int
erface { | |
89 return &tqState{state, tq} | |
90 }), state | |
91 } | |
OLD | NEW |