Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(959)

Side by Side Diff: scheduler/appengine/engine/tq/tq_test.go

Issue 2981043002: Add a task queue task router to reduce amount of boilerplate. (Closed)
Patch Set: use 202 Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2017 The LUCI Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package tq
16
17 import (
18 "bytes"
19 "fmt"
20 "net/http/httptest"
21 "testing"
22 "time"
23
24 "golang.org/x/net/context"
25
26 "github.com/golang/protobuf/proto"
27 "github.com/golang/protobuf/ptypes/duration"
28 "github.com/golang/protobuf/ptypes/empty"
29
30 "github.com/luci/gae/impl/memory"
31 "github.com/luci/gae/service/taskqueue"
32
33 "github.com/luci/luci-go/common/clock"
34 "github.com/luci/luci-go/common/clock/testclock"
35 "github.com/luci/luci-go/common/retry/transient"
36 "github.com/luci/luci-go/server/router"
37
38 . "github.com/smartystreets/goconvey/convey"
39 )
40
41 var epoch = time.Unix(1500000000, 0).UTC()
42
43 func TestDispatcher(t *testing.T) {
44 t.Parallel()
45
46 Convey("With dispatcher", t, func() {
47 ctx := testingContext()
tandrii(chromium) 2017/07/28 09:47:17 i'd move its body here so it's obvious that you ar
Vadim Sh. 2017/07/28 19:20:30 Done.
48
49 d := Dispatcher{}
50 r := router.New()
51
52 installRoutes := func() {
53 d.InstallRoutes(r, router.NewMiddlewareChain(func(c *rou ter.Context, next router.Handler) {
54 c.Context = ctx
55 next(c)
56 }))
57 }
58
59 Convey("Single task", func() {
60 var calls []proto.Message
61 handler := func(c context.Context, payload proto.Message , execCount int) error {
62 calls = append(calls, payload)
63 return nil
64 }
65
66 // Abuse some well-known proto type to simplify the test . It's doesn't
67 // matter what proto type we use here as long as it is r egistered in
68 // protobuf type registry.
69 d.RegisterTask(&duration.Duration{}, handler, "", nil)
70 installRoutes()
71
72 err := d.AddTask(ctx, &Task{
73 Payload: &duration.Duration{Seconds: 12 3},
74 DeduplicationKey: "abc",
75 Title: "abc-def",
76 Delay: 30 * time.Second,
77 })
78 So(err, ShouldBeNil)
79
80 // Added the task.
81 expectedPath := "/internal/tasks/default/abc-def"
82 expectedName := "afc6f8271b8598ee04e359916e6c584a9bc3c52 0a11dd5244e3399346ac0d3a7"
83 expectedBody := []byte(`{"type":"google.protobuf.Duratio n","body":"123.000s"}`)
84 tasks := taskqueue.GetTestable(ctx).GetScheduledTasks()
85 So(tasks, ShouldResemble, taskqueue.QueueData{
86 "default": map[string]*taskqueue.Task{
87 expectedName: {
88 Path: expectedPath,
89 Payload: expectedBody,
90 Name: expectedName,
91 Method: "POST",
92 ETA: epoch.Add(30 * time.Sec ond),
93 },
94 },
95 "another-q": {},
96 })
97
98 // Readd a task with same dedup key. Should be silently ignored.
99 err = d.AddTask(ctx, &Task{
100 Payload: &duration.Duration{Seconds: 12 3},
101 DeduplicationKey: "abc",
102 })
103 So(err, ShouldBeNil)
104
105 // No new tasks.
106 tasks = taskqueue.GetTestable(ctx).GetScheduledTasks()
107 So(len(tasks["default"]), ShouldResemble, 1)
108
109 // Execute the task.
110 req := httptest.NewRequest("POST", "http://example.com"+ expectedPath, bytes.NewReader(expectedBody))
111 rw := httptest.NewRecorder()
112 r.ServeHTTP(rw, req)
113
114 // Executed.
115 So(calls, ShouldResemble, []proto.Message{
116 &duration.Duration{Seconds: 123},
117 })
118 So(rw.Code, ShouldEqual, 200)
119 })
120
121 Convey("Many tasks", func() {
122 handler := func(c context.Context, payload proto.Message , execCount int) error { return nil }
123 d.RegisterTask(&duration.Duration{}, handler, "default", nil)
124 d.RegisterTask(&empty.Empty{}, handler, "another-q", nil )
125 installRoutes()
126
127 t := []*Task{}
128 for i := 0; i < 200; i++ {
129 var task *Task
130
131 if i%2 == 0 {
132 task = &Task{
133 DeduplicationKey: fmt.Sprintf("% d", i/2),
134 Payload: &duration.Dura tion{},
135 Delay: time.Duration( i) * time.Second,
136 }
137 } else {
138 task = &Task{
139 DeduplicationKey: fmt.Sprintf("% d", (i-1)/2),
140 Payload: &empty.Empty{} ,
141 Delay: time.Duration( i) * time.Second,
142 }
143 }
144
145 t = append(t, task)
146
147 // Mix in some duplicates.
148 if i > 0 && i%100 == 0 {
149 t = append(t, task)
150 }
151 }
152 err := d.AddTask(ctx, t...)
153 So(err, ShouldBeNil)
154
155 // Added all the tasks.
156 allTasks := taskqueue.GetTestable(ctx).GetScheduledTasks ()
157 delaysDefault := map[time.Duration]struct{}{}
158 for _, task := range allTasks["default"] {
159 delaysDefault[task.ETA.Sub(epoch)/time.Second] = struct{}{}
160 }
161 delaysAnotherQ := map[time.Duration]struct{}{}
162 for _, task := range allTasks["another-q"] {
163 delaysAnotherQ[task.ETA.Sub(epoch)/time.Second] = struct{}{}
164 }
165 So(len(delaysDefault), ShouldEqual, 100)
166 So(len(delaysAnotherQ), ShouldEqual, 100)
167 })
168
169 Convey("Execution errors", func() {
170 var returnErr error
171 panicNow := false
172 handler := func(c context.Context, payload proto.Message , execCount int) error {
173 if panicNow {
174 panic("must not be called")
175 }
176 return returnErr
177 }
178
179 d.RegisterTask(&duration.Duration{}, handler, "", nil)
180 installRoutes()
181
182 goodBody := `{"type":"google.protobuf.Duration","body":" 123.000s"}`
183
184 execute := func(body string) *httptest.ResponseRecorder {
185 req := httptest.NewRequest(
186 "POST",
187 "http://example.com/internal/tasks/defau lt/abc-def",
188 bytes.NewReader([]byte(body)))
189 rw := httptest.NewRecorder()
190 r.ServeHTTP(rw, req)
191 return rw
192 }
193
194 // Error conditions inside the task body.
195
196 returnErr = nil
197 rw := execute(goodBody)
198 So(rw.Code, ShouldEqual, 200)
199 So(rw.Body.String(), ShouldEqual, "OK\n")
200
201 returnErr = fmt.Errorf("fatal err")
202 rw = execute(goodBody)
203 So(rw.Code, ShouldEqual, 202) // no retry!
204 So(rw.Body.String(), ShouldEqual, "Fatal error: fatal er r\n")
205
206 returnErr = transient.Tag.Apply(fmt.Errorf("transient er r"))
207 rw = execute(goodBody)
208 So(rw.Code, ShouldEqual, 500) // 500 for retry
209 So(rw.Body.String(), ShouldEqual, "Transient error: tran sient err\n")
210
211 // Error conditions when routing the task.
212
213 panicNow = true
214
215 rw = execute("not a json")
216 So(rw.Code, ShouldEqual, 202) // no retry!
217 So(rw.Body.String(), ShouldStartWith, "Bad payload, can' t deserialize")
218
219 rw = execute(`{"type":"google.protobuf.Duration"}`)
220 So(rw.Code, ShouldEqual, 202) // no retry!
221 So(rw.Body.String(), ShouldStartWith, "Bad payload, can' t deserialize")
222
223 rw = execute(`{"type":"google.protobuf.Duration","body": "blah"}`)
224 So(rw.Code, ShouldEqual, 202) // no retry!
225 So(rw.Body.String(), ShouldStartWith, "Bad payload, can' t deserialize")
226
227 rw = execute(`{"type":"unknown.proto.type","body":"{}"}` )
228 So(rw.Code, ShouldEqual, 202) // no retry!
229 So(rw.Body.String(), ShouldStartWith, "Bad payload, can' t deserialize")
230 })
231 })
232 }
233
234 func testingContext() context.Context {
235 c := memory.Use(context.Background())
236 c = clock.Set(c, testclock.New(epoch))
237 taskqueue.GetTestable(c).CreateQueue("another-q")
238 return c
239 }
OLDNEW
« scheduler/appengine/engine/tq/tq.go ('K') | « scheduler/appengine/engine/tq/tq.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698