OLD | NEW |
---|---|
(Empty) | |
1 // Copyright 2017 The LUCI Authors. | |
2 // | |
3 // Licensed under the Apache License, Version 2.0 (the "License"); | |
4 // you may not use this file except in compliance with the License. | |
5 // You may obtain a copy of the License at | |
6 // | |
7 // http://www.apache.org/licenses/LICENSE-2.0 | |
8 // | |
9 // Unless required by applicable law or agreed to in writing, software | |
10 // distributed under the License is distributed on an "AS IS" BASIS, | |
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 // See the License for the specific language governing permissions and | |
13 // limitations under the License. | |
14 | |
15 package tq | |
16 | |
17 import ( | |
18 "bytes" | |
19 "fmt" | |
20 "net/http/httptest" | |
21 "testing" | |
22 "time" | |
23 | |
24 "golang.org/x/net/context" | |
25 | |
26 "github.com/golang/protobuf/proto" | |
27 "github.com/golang/protobuf/ptypes/duration" | |
28 "github.com/golang/protobuf/ptypes/empty" | |
29 | |
30 "github.com/luci/gae/impl/memory" | |
31 "github.com/luci/gae/service/taskqueue" | |
32 | |
33 "github.com/luci/luci-go/common/clock" | |
34 "github.com/luci/luci-go/common/clock/testclock" | |
35 "github.com/luci/luci-go/common/retry/transient" | |
36 "github.com/luci/luci-go/server/router" | |
37 | |
38 . "github.com/smartystreets/goconvey/convey" | |
39 ) | |
40 | |
41 var epoch = time.Unix(1500000000, 0).UTC() | |
42 | |
43 func TestDispatcher(t *testing.T) { | |
44 t.Parallel() | |
45 | |
46 Convey("With dispatcher", t, func() { | |
47 ctx := testingContext() | |
tandrii(chromium)
2017/07/28 09:47:17
i'd move its body here so it's obvious that you ar
Vadim Sh.
2017/07/28 19:20:30
Done.
| |
48 | |
49 d := Dispatcher{} | |
50 r := router.New() | |
51 | |
52 installRoutes := func() { | |
53 d.InstallRoutes(r, router.NewMiddlewareChain(func(c *rou ter.Context, next router.Handler) { | |
54 c.Context = ctx | |
55 next(c) | |
56 })) | |
57 } | |
58 | |
59 Convey("Single task", func() { | |
60 var calls []proto.Message | |
61 handler := func(c context.Context, payload proto.Message , execCount int) error { | |
62 calls = append(calls, payload) | |
63 return nil | |
64 } | |
65 | |
66 // Abuse some well-known proto type to simplify the test . It's doesn't | |
67 // matter what proto type we use here as long as it is r egistered in | |
68 // protobuf type registry. | |
69 d.RegisterTask(&duration.Duration{}, handler, "", nil) | |
70 installRoutes() | |
71 | |
72 err := d.AddTask(ctx, &Task{ | |
73 Payload: &duration.Duration{Seconds: 12 3}, | |
74 DeduplicationKey: "abc", | |
75 Title: "abc-def", | |
76 Delay: 30 * time.Second, | |
77 }) | |
78 So(err, ShouldBeNil) | |
79 | |
80 // Added the task. | |
81 expectedPath := "/internal/tasks/default/abc-def" | |
82 expectedName := "afc6f8271b8598ee04e359916e6c584a9bc3c52 0a11dd5244e3399346ac0d3a7" | |
83 expectedBody := []byte(`{"type":"google.protobuf.Duratio n","body":"123.000s"}`) | |
84 tasks := taskqueue.GetTestable(ctx).GetScheduledTasks() | |
85 So(tasks, ShouldResemble, taskqueue.QueueData{ | |
86 "default": map[string]*taskqueue.Task{ | |
87 expectedName: { | |
88 Path: expectedPath, | |
89 Payload: expectedBody, | |
90 Name: expectedName, | |
91 Method: "POST", | |
92 ETA: epoch.Add(30 * time.Sec ond), | |
93 }, | |
94 }, | |
95 "another-q": {}, | |
96 }) | |
97 | |
98 // Readd a task with same dedup key. Should be silently ignored. | |
99 err = d.AddTask(ctx, &Task{ | |
100 Payload: &duration.Duration{Seconds: 12 3}, | |
101 DeduplicationKey: "abc", | |
102 }) | |
103 So(err, ShouldBeNil) | |
104 | |
105 // No new tasks. | |
106 tasks = taskqueue.GetTestable(ctx).GetScheduledTasks() | |
107 So(len(tasks["default"]), ShouldResemble, 1) | |
108 | |
109 // Execute the task. | |
110 req := httptest.NewRequest("POST", "http://example.com"+ expectedPath, bytes.NewReader(expectedBody)) | |
111 rw := httptest.NewRecorder() | |
112 r.ServeHTTP(rw, req) | |
113 | |
114 // Executed. | |
115 So(calls, ShouldResemble, []proto.Message{ | |
116 &duration.Duration{Seconds: 123}, | |
117 }) | |
118 So(rw.Code, ShouldEqual, 200) | |
119 }) | |
120 | |
121 Convey("Many tasks", func() { | |
122 handler := func(c context.Context, payload proto.Message , execCount int) error { return nil } | |
123 d.RegisterTask(&duration.Duration{}, handler, "default", nil) | |
124 d.RegisterTask(&empty.Empty{}, handler, "another-q", nil ) | |
125 installRoutes() | |
126 | |
127 t := []*Task{} | |
128 for i := 0; i < 200; i++ { | |
129 var task *Task | |
130 | |
131 if i%2 == 0 { | |
132 task = &Task{ | |
133 DeduplicationKey: fmt.Sprintf("% d", i/2), | |
134 Payload: &duration.Dura tion{}, | |
135 Delay: time.Duration( i) * time.Second, | |
136 } | |
137 } else { | |
138 task = &Task{ | |
139 DeduplicationKey: fmt.Sprintf("% d", (i-1)/2), | |
140 Payload: &empty.Empty{} , | |
141 Delay: time.Duration( i) * time.Second, | |
142 } | |
143 } | |
144 | |
145 t = append(t, task) | |
146 | |
147 // Mix in some duplicates. | |
148 if i > 0 && i%100 == 0 { | |
149 t = append(t, task) | |
150 } | |
151 } | |
152 err := d.AddTask(ctx, t...) | |
153 So(err, ShouldBeNil) | |
154 | |
155 // Added all the tasks. | |
156 allTasks := taskqueue.GetTestable(ctx).GetScheduledTasks () | |
157 delaysDefault := map[time.Duration]struct{}{} | |
158 for _, task := range allTasks["default"] { | |
159 delaysDefault[task.ETA.Sub(epoch)/time.Second] = struct{}{} | |
160 } | |
161 delaysAnotherQ := map[time.Duration]struct{}{} | |
162 for _, task := range allTasks["another-q"] { | |
163 delaysAnotherQ[task.ETA.Sub(epoch)/time.Second] = struct{}{} | |
164 } | |
165 So(len(delaysDefault), ShouldEqual, 100) | |
166 So(len(delaysAnotherQ), ShouldEqual, 100) | |
167 }) | |
168 | |
169 Convey("Execution errors", func() { | |
170 var returnErr error | |
171 panicNow := false | |
172 handler := func(c context.Context, payload proto.Message , execCount int) error { | |
173 if panicNow { | |
174 panic("must not be called") | |
175 } | |
176 return returnErr | |
177 } | |
178 | |
179 d.RegisterTask(&duration.Duration{}, handler, "", nil) | |
180 installRoutes() | |
181 | |
182 goodBody := `{"type":"google.protobuf.Duration","body":" 123.000s"}` | |
183 | |
184 execute := func(body string) *httptest.ResponseRecorder { | |
185 req := httptest.NewRequest( | |
186 "POST", | |
187 "http://example.com/internal/tasks/defau lt/abc-def", | |
188 bytes.NewReader([]byte(body))) | |
189 rw := httptest.NewRecorder() | |
190 r.ServeHTTP(rw, req) | |
191 return rw | |
192 } | |
193 | |
194 // Error conditions inside the task body. | |
195 | |
196 returnErr = nil | |
197 rw := execute(goodBody) | |
198 So(rw.Code, ShouldEqual, 200) | |
199 So(rw.Body.String(), ShouldEqual, "OK\n") | |
200 | |
201 returnErr = fmt.Errorf("fatal err") | |
202 rw = execute(goodBody) | |
203 So(rw.Code, ShouldEqual, 202) // no retry! | |
204 So(rw.Body.String(), ShouldEqual, "Fatal error: fatal er r\n") | |
205 | |
206 returnErr = transient.Tag.Apply(fmt.Errorf("transient er r")) | |
207 rw = execute(goodBody) | |
208 So(rw.Code, ShouldEqual, 500) // 500 for retry | |
209 So(rw.Body.String(), ShouldEqual, "Transient error: tran sient err\n") | |
210 | |
211 // Error conditions when routing the task. | |
212 | |
213 panicNow = true | |
214 | |
215 rw = execute("not a json") | |
216 So(rw.Code, ShouldEqual, 202) // no retry! | |
217 So(rw.Body.String(), ShouldStartWith, "Bad payload, can' t deserialize") | |
218 | |
219 rw = execute(`{"type":"google.protobuf.Duration"}`) | |
220 So(rw.Code, ShouldEqual, 202) // no retry! | |
221 So(rw.Body.String(), ShouldStartWith, "Bad payload, can' t deserialize") | |
222 | |
223 rw = execute(`{"type":"google.protobuf.Duration","body": "blah"}`) | |
224 So(rw.Code, ShouldEqual, 202) // no retry! | |
225 So(rw.Body.String(), ShouldStartWith, "Bad payload, can' t deserialize") | |
226 | |
227 rw = execute(`{"type":"unknown.proto.type","body":"{}"}` ) | |
228 So(rw.Code, ShouldEqual, 202) // no retry! | |
229 So(rw.Body.String(), ShouldStartWith, "Bad payload, can' t deserialize") | |
230 }) | |
231 }) | |
232 } | |
233 | |
234 func testingContext() context.Context { | |
235 c := memory.Use(context.Background()) | |
236 c = clock.Set(c, testclock.New(epoch)) | |
237 taskqueue.GetTestable(c).CreateQueue("another-q") | |
238 return c | |
239 } | |
OLD | NEW |