OLD | NEW |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
4 | 4 |
5 package prod | 5 package prod |
6 | 6 |
7 import ( | 7 import ( |
8 "fmt" | 8 "fmt" |
9 "reflect" | 9 "reflect" |
10 | 10 |
11 "github.com/luci/gae" | 11 "github.com/luci/gae" |
| 12 tq "github.com/luci/gae/service/taskqueue" |
12 "golang.org/x/net/context" | 13 "golang.org/x/net/context" |
13 "google.golang.org/appengine/taskqueue" | 14 "google.golang.org/appengine/taskqueue" |
14 ) | 15 ) |
15 | 16 |
16 // useTQ adds a gae.TaskQueue implementation to context, accessible | 17 // useTQ adds a gae.TaskQueue implementation to context, accessible |
17 // by gae.GetTQ(c) | 18 // by gae.GetTQ(c) |
18 func useTQ(c context.Context) context.Context { | 19 func useTQ(c context.Context) context.Context { |
19 » return gae.SetTQFactory(c, func(ci context.Context) gae.TaskQueue { | 20 » return tq.SetFactory(c, func(ci context.Context) tq.Interface { |
20 return tqImpl{ci} | 21 return tqImpl{ci} |
21 }) | 22 }) |
22 } | 23 } |
23 | 24 |
24 type tqImpl struct { | 25 type tqImpl struct { |
25 context.Context | 26 context.Context |
26 } | 27 } |
27 | 28 |
28 func init() { | 29 func init() { |
29 » const TASK_EXPECTED_FIELDS = 9 | 30 » const taskExpectedFields = 9 |
30 // Runtime-assert that the number of fields in the Task structs is 9, to | 31 // Runtime-assert that the number of fields in the Task structs is 9, to |
31 // avoid missing additional fields if they're added later. | 32 // avoid missing additional fields if they're added later. |
32 // all other type assertions are statically enforced by o2n() and tqF2R(
) | 33 // all other type assertions are statically enforced by o2n() and tqF2R(
) |
33 | 34 |
34 oldType := reflect.TypeOf((*taskqueue.Task)(nil)) | 35 oldType := reflect.TypeOf((*taskqueue.Task)(nil)) |
35 » newType := reflect.TypeOf((*gae.TQTask)(nil)) | 36 » newType := reflect.TypeOf((*tq.Task)(nil)) |
36 | 37 |
37 if oldType.NumField() != newType.NumField() || | 38 if oldType.NumField() != newType.NumField() || |
38 » » oldType.NumField() != TASK_EXPECTED_FIELDS { | 39 » » oldType.NumField() != taskExpectedFields { |
39 panic(fmt.Errorf( | 40 panic(fmt.Errorf( |
40 "prod/taskqueue:init() field count differs: %v, %v", | 41 "prod/taskqueue:init() field count differs: %v, %v", |
41 oldType, newType)) | 42 oldType, newType)) |
42 } | 43 } |
43 } | 44 } |
44 | 45 |
45 // tqR2FErr (TQ real-to-fake w/ error) converts a *taskqueue.Task to a | 46 // tqR2FErr (TQ real-to-fake w/ error) converts a *taskqueue.Task to a |
46 // *gae.TQTask, and passes through an error. | 47 // *tq.Task, and passes through an error. |
47 func tqR2FErr(o *taskqueue.Task, err error) (*gae.TQTask, error) { | 48 func tqR2FErr(o *taskqueue.Task, err error) (*tq.Task, error) { |
48 if err != nil { | 49 if err != nil { |
49 return nil, err | 50 return nil, err |
50 } | 51 } |
51 » n := gae.TQTask{} | 52 » n := tq.Task{} |
52 n.Path = o.Path | 53 n.Path = o.Path |
53 n.Payload = o.Payload | 54 n.Payload = o.Payload |
54 n.Header = o.Header | 55 n.Header = o.Header |
55 n.Method = o.Method | 56 n.Method = o.Method |
56 n.Name = o.Name | 57 n.Name = o.Name |
57 n.Delay = o.Delay | 58 n.Delay = o.Delay |
58 n.ETA = o.ETA | 59 n.ETA = o.ETA |
59 n.RetryCount = o.RetryCount | 60 n.RetryCount = o.RetryCount |
60 » n.RetryOptions = (*gae.TQRetryOptions)(o.RetryOptions) | 61 » n.RetryOptions = (*tq.RetryOptions)(o.RetryOptions) |
61 return &n, nil | 62 return &n, nil |
62 } | 63 } |
63 | 64 |
64 // tqF2R (TQ fake-to-real) converts a *gae.TQTask to a *taskqueue.Task. | 65 // tqF2R (TQ fake-to-real) converts a *tq.Task to a *taskqueue.Task. |
65 func tqF2R(n *gae.TQTask) *taskqueue.Task { | 66 func tqF2R(n *tq.Task) *taskqueue.Task { |
66 o := taskqueue.Task{} | 67 o := taskqueue.Task{} |
67 o.Path = n.Path | 68 o.Path = n.Path |
68 o.Payload = n.Payload | 69 o.Payload = n.Payload |
69 o.Header = n.Header | 70 o.Header = n.Header |
70 o.Method = n.Method | 71 o.Method = n.Method |
71 o.Name = n.Name | 72 o.Name = n.Name |
72 o.Delay = n.Delay | 73 o.Delay = n.Delay |
73 o.ETA = n.ETA | 74 o.ETA = n.ETA |
74 o.RetryCount = n.RetryCount | 75 o.RetryCount = n.RetryCount |
75 o.RetryOptions = (*taskqueue.RetryOptions)(n.RetryOptions) | 76 o.RetryOptions = (*taskqueue.RetryOptions)(n.RetryOptions) |
76 return &o | 77 return &o |
77 } | 78 } |
78 | 79 |
79 // tqMR2FErr (TQ multi-real-to-fake w/ error) converts a slice of | 80 // tqMR2FErr (TQ multi-real-to-fake w/ error) converts a slice of |
80 // *taskqueue.Task to a slice of *gae.TQTask | 81 // *taskqueue.Task to a slice of *tq.Task |
81 func tqMR2FErr(os []*taskqueue.Task, err error) ([]*gae.TQTask, error) { | 82 func tqMR2FErr(os []*taskqueue.Task, err error) ([]*tq.Task, error) { |
82 if err != nil { | 83 if err != nil { |
83 return nil, gae.FixError(err) | 84 return nil, gae.FixError(err) |
84 } | 85 } |
85 » ret := make([]*gae.TQTask, len(os)) | 86 » ret := make([]*tq.Task, len(os)) |
86 for i, t := range os { | 87 for i, t := range os { |
87 ret[i], _ = tqR2FErr(t, nil) | 88 ret[i], _ = tqR2FErr(t, nil) |
88 } | 89 } |
89 return ret, nil | 90 return ret, nil |
90 } | 91 } |
91 | 92 |
92 // tqMF2R (TQ multi-fake-to-real) converts []*gae.TQTask to []*taskqueue.Task. | 93 // tqMF2R (TQ multi-fake-to-real) converts []*tq.Task to []*taskqueue.Task. |
93 func tqMF2R(ns []*gae.TQTask) []*taskqueue.Task { | 94 func tqMF2R(ns []*tq.Task) []*taskqueue.Task { |
94 ret := make([]*taskqueue.Task, len(ns)) | 95 ret := make([]*taskqueue.Task, len(ns)) |
95 for i, t := range ns { | 96 for i, t := range ns { |
96 ret[i] = tqF2R(t) | 97 ret[i] = tqF2R(t) |
97 } | 98 } |
98 return ret | 99 return ret |
99 } | 100 } |
100 | 101 |
101 //////// TQSingleReadWriter | 102 //////// TQSingleReadWriter |
102 | 103 |
103 func (t tqImpl) Add(task *gae.TQTask, queueName string) (*gae.TQTask, error) { | 104 func (t tqImpl) Add(task *tq.Task, queueName string) (*tq.Task, error) { |
104 return tqR2FErr(taskqueue.Add(t.Context, tqF2R(task), queueName)) | 105 return tqR2FErr(taskqueue.Add(t.Context, tqF2R(task), queueName)) |
105 } | 106 } |
106 func (t tqImpl) Delete(task *gae.TQTask, queueName string) error { | 107 func (t tqImpl) Delete(task *tq.Task, queueName string) error { |
107 return taskqueue.Delete(t.Context, tqF2R(task), queueName) | 108 return taskqueue.Delete(t.Context, tqF2R(task), queueName) |
108 } | 109 } |
109 | 110 |
110 //////// TQMultiReadWriter | 111 //////// TQMultiReadWriter |
111 | 112 |
112 func (t tqImpl) AddMulti(tasks []*gae.TQTask, queueName string) ([]*gae.TQTask,
error) { | 113 func (t tqImpl) AddMulti(tasks []*tq.Task, queueName string) ([]*tq.Task, error)
{ |
113 return tqMR2FErr(taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName)
) | 114 return tqMR2FErr(taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName)
) |
114 } | 115 } |
115 func (t tqImpl) DeleteMulti(tasks []*gae.TQTask, queueName string) error { | 116 func (t tqImpl) DeleteMulti(tasks []*tq.Task, queueName string) error { |
116 return gae.FixError(taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queu
eName)) | 117 return gae.FixError(taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queu
eName)) |
117 } | 118 } |
118 | 119 |
119 //////// TQLeaser | 120 //////// TQLeaser |
120 | 121 |
121 func (t tqImpl) Lease(maxTasks int, queueName string, leaseTime int) ([]*gae.TQT
ask, error) { | 122 func (t tqImpl) Lease(maxTasks int, queueName string, leaseTime int) ([]*tq.Task
, error) { |
122 return tqMR2FErr(taskqueue.Lease(t.Context, maxTasks, queueName, leaseTi
me)) | 123 return tqMR2FErr(taskqueue.Lease(t.Context, maxTasks, queueName, leaseTi
me)) |
123 } | 124 } |
124 func (t tqImpl) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag st
ring) ([]*gae.TQTask, error) { | 125 func (t tqImpl) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag st
ring) ([]*tq.Task, error) { |
125 return tqMR2FErr(taskqueue.LeaseByTag(t.Context, maxTasks, queueName, le
aseTime, tag)) | 126 return tqMR2FErr(taskqueue.LeaseByTag(t.Context, maxTasks, queueName, le
aseTime, tag)) |
126 } | 127 } |
127 func (t tqImpl) ModifyLease(task *gae.TQTask, queueName string, leaseTime int) e
rror { | 128 func (t tqImpl) ModifyLease(task *tq.Task, queueName string, leaseTime int) erro
r { |
128 return taskqueue.ModifyLease(t.Context, tqF2R(task), queueName, leaseTim
e) | 129 return taskqueue.ModifyLease(t.Context, tqF2R(task), queueName, leaseTim
e) |
129 } | 130 } |
130 | 131 |
131 //////// TQPurger | 132 //////// TQPurger |
132 | 133 |
133 func (t tqImpl) Purge(queueName string) error { | 134 func (t tqImpl) Purge(queueName string) error { |
134 return taskqueue.Purge(t.Context, queueName) | 135 return taskqueue.Purge(t.Context, queueName) |
135 } | 136 } |
136 | 137 |
137 //////// TQStatter | 138 //////// TQStatter |
138 | 139 |
139 func (t tqImpl) QueueStats(queueNames []string) ([]gae.TQStatistics, error) { | 140 func (t tqImpl) QueueStats(queueNames []string) ([]tq.Statistics, error) { |
140 stats, err := taskqueue.QueueStats(t.Context, queueNames) | 141 stats, err := taskqueue.QueueStats(t.Context, queueNames) |
141 if err != nil { | 142 if err != nil { |
142 return nil, err | 143 return nil, err |
143 } | 144 } |
144 » ret := make([]gae.TQStatistics, len(stats)) | 145 » ret := make([]tq.Statistics, len(stats)) |
145 for i, s := range stats { | 146 for i, s := range stats { |
146 » » ret[i] = gae.TQStatistics(s) | 147 » » ret[i] = tq.Statistics(s) |
147 } | 148 } |
148 return ret, nil | 149 return ret, nil |
149 } | 150 } |
OLD | NEW |