OLD | NEW |
| (Empty) |
1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 package prod | |
6 | |
7 import ( | |
8 "fmt" | |
9 "reflect" | |
10 | |
11 "github.com/luci/gae" | |
12 "golang.org/x/net/context" | |
13 "google.golang.org/appengine/taskqueue" | |
14 ) | |
15 | |
16 // useTQ adds a gae.TaskQueue implementation to context, accessible | |
17 // by gae.GetTQ(c) | |
18 func useTQ(c context.Context) context.Context { | |
19 return gae.SetTQFactory(c, func(ci context.Context) gae.TaskQueue { | |
20 return tqImpl{ci} | |
21 }) | |
22 } | |
23 | |
24 type tqImpl struct { | |
25 context.Context | |
26 } | |
27 | |
28 func init() { | |
29 const TASK_EXPECTED_FIELDS = 9 | |
30 // Runtime-assert that the number of fields in the Task structs is 9, to | |
31 // avoid missing additional fields if they're added later. | |
32 // all other type assertions are statically enforced by o2n() and tqF2R(
) | |
33 | |
34 oldType := reflect.TypeOf((*taskqueue.Task)(nil)) | |
35 newType := reflect.TypeOf((*gae.TQTask)(nil)) | |
36 | |
37 if oldType.NumField() != newType.NumField() || | |
38 oldType.NumField() != TASK_EXPECTED_FIELDS { | |
39 panic(fmt.Errorf( | |
40 "prod/taskqueue:init() field count differs: %v, %v", | |
41 oldType, newType)) | |
42 } | |
43 } | |
44 | |
45 // tqR2FErr (TQ real-to-fake w/ error) converts a *taskqueue.Task to a | |
46 // *gae.TQTask, and passes through an error. | |
47 func tqR2FErr(o *taskqueue.Task, err error) (*gae.TQTask, error) { | |
48 if err != nil { | |
49 return nil, err | |
50 } | |
51 n := gae.TQTask{} | |
52 n.Path = o.Path | |
53 n.Payload = o.Payload | |
54 n.Header = o.Header | |
55 n.Method = o.Method | |
56 n.Name = o.Name | |
57 n.Delay = o.Delay | |
58 n.ETA = o.ETA | |
59 n.RetryCount = o.RetryCount | |
60 n.RetryOptions = (*gae.TQRetryOptions)(o.RetryOptions) | |
61 return &n, nil | |
62 } | |
63 | |
64 // tqF2R (TQ fake-to-real) converts a *gae.TQTask to a *taskqueue.Task. | |
65 func tqF2R(n *gae.TQTask) *taskqueue.Task { | |
66 o := taskqueue.Task{} | |
67 o.Path = n.Path | |
68 o.Payload = n.Payload | |
69 o.Header = n.Header | |
70 o.Method = n.Method | |
71 o.Name = n.Name | |
72 o.Delay = n.Delay | |
73 o.ETA = n.ETA | |
74 o.RetryCount = n.RetryCount | |
75 o.RetryOptions = (*taskqueue.RetryOptions)(n.RetryOptions) | |
76 return &o | |
77 } | |
78 | |
79 // tqMR2FErr (TQ multi-real-to-fake w/ error) converts a slice of | |
80 // *taskqueue.Task to a slice of *gae.TQTask | |
81 func tqMR2FErr(os []*taskqueue.Task, err error) ([]*gae.TQTask, error) { | |
82 if err != nil { | |
83 return nil, gae.FixError(err) | |
84 } | |
85 ret := make([]*gae.TQTask, len(os)) | |
86 for i, t := range os { | |
87 ret[i], _ = tqR2FErr(t, nil) | |
88 } | |
89 return ret, nil | |
90 } | |
91 | |
92 // tqMF2R (TQ multi-fake-to-real) converts []*gae.TQTask to []*taskqueue.Task. | |
93 func tqMF2R(ns []*gae.TQTask) []*taskqueue.Task { | |
94 ret := make([]*taskqueue.Task, len(ns)) | |
95 for i, t := range ns { | |
96 ret[i] = tqF2R(t) | |
97 } | |
98 return ret | |
99 } | |
100 | |
101 //////// TQSingleReadWriter | |
102 | |
103 func (t tqImpl) Add(task *gae.TQTask, queueName string) (*gae.TQTask, error) { | |
104 return tqR2FErr(taskqueue.Add(t.Context, tqF2R(task), queueName)) | |
105 } | |
106 func (t tqImpl) Delete(task *gae.TQTask, queueName string) error { | |
107 return taskqueue.Delete(t.Context, tqF2R(task), queueName) | |
108 } | |
109 | |
110 //////// TQMultiReadWriter | |
111 | |
112 func (t tqImpl) AddMulti(tasks []*gae.TQTask, queueName string) ([]*gae.TQTask,
error) { | |
113 return tqMR2FErr(taskqueue.AddMulti(t.Context, tqMF2R(tasks), queueName)
) | |
114 } | |
115 func (t tqImpl) DeleteMulti(tasks []*gae.TQTask, queueName string) error { | |
116 return gae.FixError(taskqueue.DeleteMulti(t.Context, tqMF2R(tasks), queu
eName)) | |
117 } | |
118 | |
119 //////// TQLeaser | |
120 | |
121 func (t tqImpl) Lease(maxTasks int, queueName string, leaseTime int) ([]*gae.TQT
ask, error) { | |
122 return tqMR2FErr(taskqueue.Lease(t.Context, maxTasks, queueName, leaseTi
me)) | |
123 } | |
124 func (t tqImpl) LeaseByTag(maxTasks int, queueName string, leaseTime int, tag st
ring) ([]*gae.TQTask, error) { | |
125 return tqMR2FErr(taskqueue.LeaseByTag(t.Context, maxTasks, queueName, le
aseTime, tag)) | |
126 } | |
127 func (t tqImpl) ModifyLease(task *gae.TQTask, queueName string, leaseTime int) e
rror { | |
128 return taskqueue.ModifyLease(t.Context, tqF2R(task), queueName, leaseTim
e) | |
129 } | |
130 | |
131 //////// TQPurger | |
132 | |
133 func (t tqImpl) Purge(queueName string) error { | |
134 return taskqueue.Purge(t.Context, queueName) | |
135 } | |
136 | |
137 //////// TQStatter | |
138 | |
139 func (t tqImpl) QueueStats(queueNames []string) ([]gae.TQStatistics, error) { | |
140 stats, err := taskqueue.QueueStats(t.Context, queueNames) | |
141 if err != nil { | |
142 return nil, err | |
143 } | |
144 ret := make([]gae.TQStatistics, len(stats)) | |
145 for i, s := range stats { | |
146 ret[i] = gae.TQStatistics(s) | |
147 } | |
148 return ret, nil | |
149 } | |
OLD | NEW |