OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package prod | 5 package prod |
6 | 6 |
7 import ( | 7 import ( |
8 "fmt" | 8 "fmt" |
9 "reflect" | 9 "reflect" |
10 | 10 |
11 tq "github.com/luci/gae/service/taskqueue" | 11 tq "github.com/luci/gae/service/taskqueue" |
12 "github.com/luci/luci-go/common/errors" | |
13 "golang.org/x/net/context" | 12 "golang.org/x/net/context" |
| 13 "google.golang.org/appengine" |
14 "google.golang.org/appengine/taskqueue" | 14 "google.golang.org/appengine/taskqueue" |
15 ) | 15 ) |
16 | 16 |
17 // useTQ adds a gae.TaskQueue implementation to context, accessible | 17 // useTQ adds a gae.TaskQueue implementation to context, accessible |
18 // by gae.GetTQ(c) | 18 // by gae.GetTQ(c) |
19 func useTQ(c context.Context) context.Context { | 19 func useTQ(c context.Context) context.Context { |
20 » return tq.SetFactory(c, func(ci context.Context) tq.Interface { | 20 » return tq.SetRawFactory(c, func(ci context.Context) tq.RawInterface { |
21 return tqImpl{ci} | 21 return tqImpl{ci} |
22 }) | 22 }) |
23 } | 23 } |
24 | 24 |
25 type tqImpl struct { | 25 type tqImpl struct { |
26 context.Context | 26 context.Context |
27 } | 27 } |
28 | 28 |
29 func init() { | 29 func init() { |
30 const taskExpectedFields = 9 | 30 const taskExpectedFields = 9 |
31 // Runtime-assert that the number of fields in the Task structs is 9, to | 31 // Runtime-assert that the number of fields in the Task structs is 9, to |
32 // avoid missing additional fields if they're added later. | 32 // avoid missing additional fields if they're added later. |
33 // all other type assertions are statically enforced by o2n() and tqF2R(
) | 33 // all other type assertions are statically enforced by o2n() and tqF2R(
) |
34 | 34 |
35 oldType := reflect.TypeOf((*taskqueue.Task)(nil)) | 35 oldType := reflect.TypeOf((*taskqueue.Task)(nil)) |
36 newType := reflect.TypeOf((*tq.Task)(nil)) | 36 newType := reflect.TypeOf((*tq.Task)(nil)) |
37 | 37 |
38 if oldType.NumField() != newType.NumField() || | 38 if oldType.NumField() != newType.NumField() || |
39 oldType.NumField() != taskExpectedFields { | 39 oldType.NumField() != taskExpectedFields { |
40 panic(fmt.Errorf( | 40 panic(fmt.Errorf( |
41 "prod/taskqueue:init() field count differs: %v, %v", | 41 "prod/taskqueue:init() field count differs: %v, %v", |
42 oldType, newType)) | 42 oldType, newType)) |
43 } | 43 } |
44 } | 44 } |
45 | 45 |
46 // tqR2FErr (TQ real-to-fake w/ error) converts a *taskqueue.Task to a | 46 // tqR2F (TQ real-to-fake) converts a *taskqueue.Task to a *tq.Task. |
47 // *tq.Task, and passes through an error. | 47 func tqR2F(o *taskqueue.Task) *tq.Task { |
48 func tqR2FErr(o *taskqueue.Task, err error) (*tq.Task, error) { | 48 » if o == nil { |
49 » if err != nil { | 49 » » return nil |
50 » » return nil, err | |
51 } | 50 } |
52 n := tq.Task{} | 51 n := tq.Task{} |
53 n.Path = o.Path | 52 n.Path = o.Path |
54 n.Payload = o.Payload | 53 n.Payload = o.Payload |
55 n.Header = o.Header | 54 n.Header = o.Header |
56 n.Method = o.Method | 55 n.Method = o.Method |
57 n.Name = o.Name | 56 n.Name = o.Name |
58 n.Delay = o.Delay | 57 n.Delay = o.Delay |
59 n.ETA = o.ETA | 58 n.ETA = o.ETA |
60 n.RetryCount = o.RetryCount | 59 n.RetryCount = o.RetryCount |
61 n.RetryOptions = (*tq.RetryOptions)(o.RetryOptions) | 60 n.RetryOptions = (*tq.RetryOptions)(o.RetryOptions) |
62 » return &n, nil | 61 » return &n |
63 } | 62 } |
64 | 63 |
65 // tqF2R (TQ fake-to-real) converts a *tq.Task to a *taskqueue.Task. | 64 // tqF2R (TQ fake-to-real) converts a *tq.Task to a *taskqueue.Task. |
66 func tqF2R(n *tq.Task) *taskqueue.Task { | 65 func tqF2R(n *tq.Task) *taskqueue.Task { |
67 o := taskqueue.Task{} | 66 o := taskqueue.Task{} |
68 o.Path = n.Path | 67 o.Path = n.Path |
69 o.Payload = n.Payload | 68 o.Payload = n.Payload |
70 o.Header = n.Header | 69 o.Header = n.Header |
71 o.Method = n.Method | 70 o.Method = n.Method |
72 o.Name = n.Name | 71 o.Name = n.Name |
73 o.Delay = n.Delay | 72 o.Delay = n.Delay |
74 o.ETA = n.ETA | 73 o.ETA = n.ETA |
75 o.RetryCount = n.RetryCount | 74 o.RetryCount = n.RetryCount |
76 o.RetryOptions = (*taskqueue.RetryOptions)(n.RetryOptions) | 75 o.RetryOptions = (*taskqueue.RetryOptions)(n.RetryOptions) |
77 return &o | 76 return &o |
78 } | 77 } |
79 | 78 |
80 // tqMR2FErr (TQ multi-real-to-fake w/ error) converts a slice of | |
81 // *taskqueue.Task to a slice of *tq.Task | |
82 func tqMR2FErr(os []*taskqueue.Task, err error) ([]*tq.Task, error) { | |
83 if err != nil { | |
84 return nil, errors.Fix(err) | |
85 } | |
86 ret := make([]*tq.Task, len(os)) | |
87 for i, t := range os { | |
88 ret[i], _ = tqR2FErr(t, nil) | |
89 } | |
90 return ret, nil | |
91 } | |
92 | |
93 // tqMF2R (TQ multi-fake-to-real) converts []*tq.Task to []*taskqueue.Task. | 79 // tqMF2R (TQ multi-fake-to-real) converts []*tq.Task to []*taskqueue.Task. |
94 func tqMF2R(ns []*tq.Task) []*taskqueue.Task { | 80 func tqMF2R(ns []*tq.Task) []*taskqueue.Task { |
95 ret := make([]*taskqueue.Task, len(ns)) | 81 ret := make([]*taskqueue.Task, len(ns)) |
96 for i, t := range ns { | 82 for i, t := range ns { |
97 ret[i] = tqF2R(t) | 83 ret[i] = tqF2R(t) |
98 } | 84 } |
99 return ret | 85 return ret |
100 } | 86 } |
101 | 87 |
102 //////// TQSingleReadWriter | 88 func (t tqImpl) AddMulti(tasks []*tq.Task, queueName string, cb tq.RawTaskCB) er
ror { |
103 | 89 » realTasks, err := taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName
) |
104 func (t tqImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) { | 90 » if err != nil { |
105 » return tqR2FErr(taskqueue.Add(t.Context, tqF2R(task), queueName)) | 91 » » if me, ok := err.(appengine.MultiError); ok { |
106 } | 92 » » » for i, err := range me { |
107 func (t tqImpl) Delete(task *tq.Task, queueName string) error { | 93 » » » » tsk := (*taskqueue.Task)(nil) |
108 » return taskqueue.Delete(t.Context, tqF2R(task), queueName) | 94 » » » » if realTasks != nil { |
| 95 » » » » » tsk = realTasks[i] |
| 96 » » » » } |
| 97 » » » » cb(tqR2F(tsk), err) |
| 98 » » » } |
| 99 » » » err = nil |
| 100 » » } |
| 101 » } else { |
| 102 » » for _, tsk := range realTasks { |
| 103 » » » cb(tqR2F(tsk), nil) |
| 104 » » } |
| 105 » } |
| 106 » return err |
109 } | 107 } |
110 | 108 |
111 //////// TQMultiReadWriter | 109 func (t tqImpl) DeleteMulti(tasks []*tq.Task, queueName string, cb tq.RawCB) err
or { |
112 | 110 » err := taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queueName) |
113 func (t tqImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task, error)
{ | 111 » if me, ok := err.(appengine.MultiError); ok { |
114 » return tqMR2FErr(taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName)
) | 112 » » for _, err := range me { |
| 113 » » » cb(err) |
| 114 » » } |
| 115 » » err = nil |
| 116 » } |
| 117 » return err |
115 } | 118 } |
116 func (t tqImpl) DeleteMulti(tasks []*tq.Task, queueName string) error { | |
117 return errors.Fix(taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queueN
ame)) | |
118 } | |
119 | |
120 //////// TQLeaser | |
121 | |
122 func (t tqImpl) Lease(maxTasks int, queueName string, leaseTime int) ([]*tq.Task
, error) { | |
123 return tqMR2FErr(taskqueue.Lease(t.Context, maxTasks, queueName, leaseTi
me)) | |
124 } | |
125 func (t tqImpl) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag st
ring) ([]*tq.Task, error) { | |
126 return tqMR2FErr(taskqueue.LeaseByTag(t.Context, maxTasks, queueName, le
aseTime, tag)) | |
127 } | |
128 func (t tqImpl) ModifyLease(task *tq.Task, queueName string, leaseTime int) erro
r { | |
129 return taskqueue.ModifyLease(t.Context, tqF2R(task), queueName, leaseTim
e) | |
130 } | |
131 | |
132 //////// TQPurger | |
133 | 119 |
134 func (t tqImpl) Purge(queueName string) error { | 120 func (t tqImpl) Purge(queueName string) error { |
135 return taskqueue.Purge(t.Context, queueName) | 121 return taskqueue.Purge(t.Context, queueName) |
136 } | 122 } |
137 | 123 |
138 //////// TQStatter | 124 func (t tqImpl) Stats(queueNames []string, cb tq.RawStatsCB) error { |
139 | |
140 func (t tqImpl) QueueStats(queueNames []string) ([]tq.Statistics, error) { | |
141 stats, err := taskqueue.QueueStats(t.Context, queueNames) | 125 stats, err := taskqueue.QueueStats(t.Context, queueNames) |
142 if err != nil { | 126 if err != nil { |
143 » » return nil, err | 127 » » return err |
144 } | 128 } |
145 » ret := make([]tq.Statistics, len(stats)) | 129 » for _, s := range stats { |
146 » for i, s := range stats { | 130 » » cb((*tq.Statistics)(&s), nil) |
147 » » ret[i] = tq.Statistics(s) | |
148 } | 131 } |
149 » return ret, nil | 132 » return nil |
150 } | 133 } |
OLD | NEW |