| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package featureBreaker | 5 package featureBreaker |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "golang.org/x/net/context" | 8 "golang.org/x/net/context" |
| 9 | 9 |
| 10 tq "github.com/luci/gae/service/taskqueue" | 10 tq "github.com/luci/gae/service/taskqueue" |
| 11 ) | 11 ) |
| 12 | 12 |
| 13 type tqState struct { | 13 type tqState struct { |
| 14 *state | 14 *state |
| 15 | 15 |
| 16 » tq tq.Interface | 16 » tq tq.RawInterface |
| 17 } | 17 } |
| 18 | 18 |
| 19 var _ tq.Interface = (*tqState)(nil) | 19 var _ tq.RawInterface = (*tqState)(nil) |
| 20 | 20 |
| 21 func (t *tqState) Add(task *tq.Task, queueName string) (ret *tq.Task, err error)
{ | 21 func (t *tqState) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB)
error { |
| 22 » err = t.run(func() (err error) { | 22 » return t.run(func() (err error) { return t.tq.AddMulti(tasks, queueName,
cb) }) |
| 23 » » ret, err = t.tq.Add(task, queueName) | |
| 24 » » return | |
| 25 » }) | |
| 26 » return | |
| 27 } | 23 } |
| 28 | 24 |
| 29 func (t *tqState) Delete(task *tq.Task, queueName string) error { | 25 func (t *tqState) DeleteMulti(tasks []*tq.Task, queueName string, cb tq.RawCB) e
rror { |
| 30 » return t.run(func() error { | 26 » return t.run(func() error { return t.tq.DeleteMulti(tasks, queueName, cb
) }) |
| 31 » » return t.tq.Delete(task, queueName) | |
| 32 » }) | |
| 33 } | |
| 34 | |
| 35 func (t *tqState) AddMulti(tasks []*tq.Task, queueName string) (ret []*tq.Task,
err error) { | |
| 36 » err = t.run(func() (err error) { | |
| 37 » » ret, err = t.tq.AddMulti(tasks, queueName) | |
| 38 » » return | |
| 39 » }) | |
| 40 » return | |
| 41 } | |
| 42 | |
| 43 func (t *tqState) DeleteMulti(tasks []*tq.Task, queueName string) error { | |
| 44 » return t.run(func() error { | |
| 45 » » return t.tq.DeleteMulti(tasks, queueName) | |
| 46 » }) | |
| 47 } | |
| 48 | |
| 49 func (t *tqState) Lease(maxTasks int, queueName string, leaseTime int) (ret []*t
q.Task, err error) { | |
| 50 » err = t.run(func() (err error) { | |
| 51 » » ret, err = t.tq.Lease(maxTasks, queueName, leaseTime) | |
| 52 » » return | |
| 53 » }) | |
| 54 » return | |
| 55 } | |
| 56 | |
| 57 func (t *tqState) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag
string) (ret []*tq.Task, err error) { | |
| 58 » err = t.run(func() (err error) { | |
| 59 » » ret, err = t.tq.LeaseByTag(maxTasks, queueName, leaseTime, tag) | |
| 60 » » return | |
| 61 » }) | |
| 62 » return | |
| 63 } | |
| 64 | |
| 65 func (t *tqState) ModifyLease(task *tq.Task, queueName string, leaseTime int) er
ror { | |
| 66 » return t.run(func() error { | |
| 67 » » return t.tq.ModifyLease(task, queueName, leaseTime) | |
| 68 » }) | |
| 69 } | 27 } |
| 70 | 28 |
| 71 func (t *tqState) Purge(queueName string) error { | 29 func (t *tqState) Purge(queueName string) error { |
| 72 » return t.run(func() error { | 30 » return t.run(func() error { return t.tq.Purge(queueName) }) |
| 73 » » return t.tq.Purge(queueName) | |
| 74 » }) | |
| 75 } | 31 } |
| 76 | 32 |
| 77 func (t *tqState) QueueStats(queueNames []string) (ret []tq.Statistics, err erro
r) { | 33 func (t *tqState) Stats(queueNames []string, cb tq.RawStatsCB) error { |
| 78 » err = t.run(func() (err error) { | 34 » return t.run(func() error { return t.tq.Stats(queueNames, cb) }) |
| 79 » » ret, err = t.tq.QueueStats(queueNames) | |
| 80 » » return | |
| 81 » }) | |
| 82 » return | |
| 83 } | 35 } |
| 84 | 36 |
| 85 // FilterTQ installs a counter TaskQueue filter in the context. | 37 // FilterTQ installs a counter TaskQueue filter in the context. |
| 86 func FilterTQ(c context.Context, defaultError error) (context.Context, FeatureBr
eaker) { | 38 func FilterTQ(c context.Context, defaultError error) (context.Context, FeatureBr
eaker) { |
| 87 state := newState(defaultError) | 39 state := newState(defaultError) |
| 88 » return tq.AddFilters(c, func(ic context.Context, tq tq.Interface) tq.Int
erface { | 40 » return tq.AddRawFilters(c, func(ic context.Context, tq tq.RawInterface)
tq.RawInterface { |
| 89 return &tqState{state, tq} | 41 return &tqState{state, tq} |
| 90 }), state | 42 }), state |
| 91 } | 43 } |
| OLD | NEW |