OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package featureBreaker | 5 package featureBreaker |
6 | 6 |
7 import ( | 7 import ( |
8 "golang.org/x/net/context" | 8 "golang.org/x/net/context" |
9 | 9 |
10 » "github.com/luci/gae" | 10 » tq "github.com/luci/gae/service/taskqueue" |
11 ) | 11 ) |
12 | 12 |
13 type tqState struct { | 13 type tqState struct { |
14 *state | 14 *state |
15 | 15 |
16 » tq gae.TaskQueue | 16 » tq tq.Interface |
17 } | 17 } |
18 | 18 |
19 var _ gae.TaskQueue = (*tqState)(nil) | 19 var _ tq.Interface = (*tqState)(nil) |
20 | 20 |
21 func (t *tqState) Add(task *gae.TQTask, queueName string) (ret *gae.TQTask, err
error) { | 21 func (t *tqState) Add(task *tq.Task, queueName string) (ret *tq.Task, err error)
{ |
22 err = t.run(func() (err error) { | 22 err = t.run(func() (err error) { |
23 ret, err = t.tq.Add(task, queueName) | 23 ret, err = t.tq.Add(task, queueName) |
24 return | 24 return |
25 }) | 25 }) |
26 return | 26 return |
27 } | 27 } |
28 | 28 |
29 func (t *tqState) Delete(task *gae.TQTask, queueName string) error { | 29 func (t *tqState) Delete(task *tq.Task, queueName string) error { |
30 return t.run(func() error { | 30 return t.run(func() error { |
31 return t.tq.Delete(task, queueName) | 31 return t.tq.Delete(task, queueName) |
32 }) | 32 }) |
33 } | 33 } |
34 | 34 |
35 func (t *tqState) AddMulti(tasks []*gae.TQTask, queueName string) (ret []*gae.TQ
Task, err error) { | 35 func (t *tqState) AddMulti(tasks []*tq.Task, queueName string) (ret []*tq.Task,
err error) { |
36 err = t.run(func() (err error) { | 36 err = t.run(func() (err error) { |
37 ret, err = t.tq.AddMulti(tasks, queueName) | 37 ret, err = t.tq.AddMulti(tasks, queueName) |
38 return | 38 return |
39 }) | 39 }) |
40 return | 40 return |
41 } | 41 } |
42 | 42 |
43 func (t *tqState) DeleteMulti(tasks []*gae.TQTask, queueName string) error { | 43 func (t *tqState) DeleteMulti(tasks []*tq.Task, queueName string) error { |
44 return t.run(func() error { | 44 return t.run(func() error { |
45 return t.tq.DeleteMulti(tasks, queueName) | 45 return t.tq.DeleteMulti(tasks, queueName) |
46 }) | 46 }) |
47 } | 47 } |
48 | 48 |
49 func (t *tqState) Lease(maxTasks int, queueName string, leaseTime int) (ret []*g
ae.TQTask, err error) { | 49 func (t *tqState) Lease(maxTasks int, queueName string, leaseTime int) (ret []*t
q.Task, err error) { |
50 err = t.run(func() (err error) { | 50 err = t.run(func() (err error) { |
51 ret, err = t.tq.Lease(maxTasks, queueName, leaseTime) | 51 ret, err = t.tq.Lease(maxTasks, queueName, leaseTime) |
52 return | 52 return |
53 }) | 53 }) |
54 return | 54 return |
55 } | 55 } |
56 | 56 |
57 func (t *tqState) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag
string) (ret []*gae.TQTask, err error) { | 57 func (t *tqState) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag
string) (ret []*tq.Task, err error) { |
58 err = t.run(func() (err error) { | 58 err = t.run(func() (err error) { |
59 ret, err = t.tq.LeaseByTag(maxTasks, queueName, leaseTime, tag) | 59 ret, err = t.tq.LeaseByTag(maxTasks, queueName, leaseTime, tag) |
60 return | 60 return |
61 }) | 61 }) |
62 return | 62 return |
63 } | 63 } |
64 | 64 |
65 func (t *tqState) ModifyLease(task *gae.TQTask, queueName string, leaseTime int)
error { | 65 func (t *tqState) ModifyLease(task *tq.Task, queueName string, leaseTime int) er
ror { |
66 return t.run(func() error { | 66 return t.run(func() error { |
67 return t.tq.ModifyLease(task, queueName, leaseTime) | 67 return t.tq.ModifyLease(task, queueName, leaseTime) |
68 }) | 68 }) |
69 } | 69 } |
70 | 70 |
71 func (t *tqState) Purge(queueName string) error { | 71 func (t *tqState) Purge(queueName string) error { |
72 return t.run(func() error { | 72 return t.run(func() error { |
73 return t.tq.Purge(queueName) | 73 return t.tq.Purge(queueName) |
74 }) | 74 }) |
75 } | 75 } |
76 | 76 |
77 func (t *tqState) QueueStats(queueNames []string) (ret []gae.TQStatistics, err e
rror) { | 77 func (t *tqState) QueueStats(queueNames []string) (ret []tq.Statistics, err erro
r) { |
78 err = t.run(func() (err error) { | 78 err = t.run(func() (err error) { |
79 ret, err = t.tq.QueueStats(queueNames) | 79 ret, err = t.tq.QueueStats(queueNames) |
80 return | 80 return |
81 }) | 81 }) |
82 return | 82 return |
83 } | 83 } |
84 | 84 |
85 // FilterTQ installs a counter TaskQueue filter in the context. | 85 // FilterTQ installs a counter TaskQueue filter in the context. |
86 func FilterTQ(c context.Context, defaultError error) (context.Context, FeatureBr
eaker) { | 86 func FilterTQ(c context.Context, defaultError error) (context.Context, FeatureBr
eaker) { |
87 state := newState(defaultError) | 87 state := newState(defaultError) |
88 » return gae.AddTQFilters(c, func(ic context.Context, tq gae.TaskQueue) ga
e.TaskQueue { | 88 » return tq.AddFilters(c, func(ic context.Context, tq tq.Interface) tq.Int
erface { |
89 return &tqState{state, tq} | 89 return &tqState{state, tq} |
90 }), state | 90 }), state |
91 } | 91 } |
OLD | NEW |