Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(457)

Side by Side Diff: scheduler/appengine/engine/tq/tq_test.go

Issue 2981043002: Add a task queue task router to reduce amount of boilerplate. (Closed)
Patch Set: nits Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « scheduler/appengine/engine/tq/tq.go ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2017 The LUCI Authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package tq
16
17 import (
18 "bytes"
19 "fmt"
20 "net/http/httptest"
21 "testing"
22 "time"
23
24 "golang.org/x/net/context"
25
26 "github.com/golang/protobuf/proto"
27 "github.com/golang/protobuf/ptypes/duration"
28 "github.com/golang/protobuf/ptypes/empty"
29
30 "github.com/luci/gae/impl/memory"
31 "github.com/luci/gae/service/taskqueue"
32
33 "github.com/luci/luci-go/common/clock"
34 "github.com/luci/luci-go/common/clock/testclock"
35 "github.com/luci/luci-go/common/retry/transient"
36 "github.com/luci/luci-go/server/router"
37
38 . "github.com/smartystreets/goconvey/convey"
39 )
40
41 var epoch = time.Unix(1500000000, 0).UTC()
42
43 func TestDispatcher(t *testing.T) {
44 t.Parallel()
45
46 Convey("With dispatcher", t, func() {
47 ctx := memory.Use(context.Background())
48 ctx = clock.Set(ctx, testclock.New(epoch))
49 taskqueue.GetTestable(ctx).CreateQueue("another-q")
50
51 d := Dispatcher{}
52 r := router.New()
53
54 installRoutes := func() {
55 d.InstallRoutes(r, router.NewMiddlewareChain(func(c *rou ter.Context, next router.Handler) {
56 c.Context = ctx
57 next(c)
58 }))
59 }
60
61 Convey("Single task", func() {
62 var calls []proto.Message
63 handler := func(c context.Context, payload proto.Message , execCount int) error {
64 calls = append(calls, payload)
65 return nil
66 }
67
68 // Abuse some well-known proto type to simplify the test . It's doesn't
69 // matter what proto type we use here as long as it is r egistered in
70 // protobuf type registry.
71 d.RegisterTask(&duration.Duration{}, handler, "", nil)
72 installRoutes()
73
74 err := d.AddTask(ctx, &Task{
75 Payload: &duration.Duration{Seconds: 12 3},
76 DeduplicationKey: "abc",
77 Title: "abc-def",
78 Delay: 30 * time.Second,
79 })
80 So(err, ShouldBeNil)
81
82 // Added the task.
83 expectedPath := "/internal/tasks/default/abc-def"
84 expectedName := "afc6f8271b8598ee04e359916e6c584a9bc3c52 0a11dd5244e3399346ac0d3a7"
85 expectedBody := []byte(`{"type":"google.protobuf.Duratio n","body":"123.000s"}`)
86 tasks := taskqueue.GetTestable(ctx).GetScheduledTasks()
87 So(tasks, ShouldResemble, taskqueue.QueueData{
88 "default": map[string]*taskqueue.Task{
89 expectedName: {
90 Path: expectedPath,
91 Payload: expectedBody,
92 Name: expectedName,
93 Method: "POST",
94 ETA: epoch.Add(30 * time.Sec ond),
95 },
96 },
97 "another-q": {},
98 })
99
100 // Readd a task with same dedup key. Should be silently ignored.
101 err = d.AddTask(ctx, &Task{
102 Payload: &duration.Duration{Seconds: 12 3},
103 DeduplicationKey: "abc",
104 })
105 So(err, ShouldBeNil)
106
107 // No new tasks.
108 tasks = taskqueue.GetTestable(ctx).GetScheduledTasks()
109 So(len(tasks["default"]), ShouldResemble, 1)
110
111 // Execute the task.
112 req := httptest.NewRequest("POST", "http://example.com"+ expectedPath, bytes.NewReader(expectedBody))
113 rw := httptest.NewRecorder()
114 r.ServeHTTP(rw, req)
115
116 // Executed.
117 So(calls, ShouldResemble, []proto.Message{
118 &duration.Duration{Seconds: 123},
119 })
120 So(rw.Code, ShouldEqual, 200)
121 })
122
123 Convey("Many tasks", func() {
124 handler := func(c context.Context, payload proto.Message , execCount int) error { return nil }
125 d.RegisterTask(&duration.Duration{}, handler, "default", nil)
126 d.RegisterTask(&empty.Empty{}, handler, "another-q", nil )
127 installRoutes()
128
129 t := []*Task{}
130 for i := 0; i < 200; i++ {
131 var task *Task
132
133 if i%2 == 0 {
134 task = &Task{
135 DeduplicationKey: fmt.Sprintf("% d", i/2),
136 Payload: &duration.Dura tion{},
137 Delay: time.Duration( i) * time.Second,
138 }
139 } else {
140 task = &Task{
141 DeduplicationKey: fmt.Sprintf("% d", (i-1)/2),
142 Payload: &empty.Empty{} ,
143 Delay: time.Duration( i) * time.Second,
144 }
145 }
146
147 t = append(t, task)
148
149 // Mix in some duplicates.
150 if i > 0 && i%100 == 0 {
151 t = append(t, task)
152 }
153 }
154 err := d.AddTask(ctx, t...)
155 So(err, ShouldBeNil)
156
157 // Added all the tasks.
158 allTasks := taskqueue.GetTestable(ctx).GetScheduledTasks ()
159 delaysDefault := map[time.Duration]struct{}{}
160 for _, task := range allTasks["default"] {
161 delaysDefault[task.ETA.Sub(epoch)/time.Second] = struct{}{}
162 }
163 delaysAnotherQ := map[time.Duration]struct{}{}
164 for _, task := range allTasks["another-q"] {
165 delaysAnotherQ[task.ETA.Sub(epoch)/time.Second] = struct{}{}
166 }
167 So(len(delaysDefault), ShouldEqual, 100)
168 So(len(delaysAnotherQ), ShouldEqual, 100)
169 })
170
171 Convey("Execution errors", func() {
172 var returnErr error
173 panicNow := false
174 handler := func(c context.Context, payload proto.Message , execCount int) error {
175 if panicNow {
176 panic("must not be called")
177 }
178 return returnErr
179 }
180
181 d.RegisterTask(&duration.Duration{}, handler, "", nil)
182 installRoutes()
183
184 goodBody := `{"type":"google.protobuf.Duration","body":" 123.000s"}`
185
186 execute := func(body string) *httptest.ResponseRecorder {
187 req := httptest.NewRequest(
188 "POST",
189 "http://example.com/internal/tasks/defau lt/abc-def",
190 bytes.NewReader([]byte(body)))
191 rw := httptest.NewRecorder()
192 r.ServeHTTP(rw, req)
193 return rw
194 }
195
196 // Error conditions inside the task body.
197
198 returnErr = nil
199 rw := execute(goodBody)
200 So(rw.Code, ShouldEqual, 200)
201 So(rw.Body.String(), ShouldEqual, "OK\n")
202
203 returnErr = fmt.Errorf("fatal err")
204 rw = execute(goodBody)
205 So(rw.Code, ShouldEqual, 202) // no retry!
206 So(rw.Body.String(), ShouldEqual, "Fatal error: fatal er r\n")
207
208 returnErr = transient.Tag.Apply(fmt.Errorf("transient er r"))
209 rw = execute(goodBody)
210 So(rw.Code, ShouldEqual, 500) // 500 for retry
211 So(rw.Body.String(), ShouldEqual, "Transient error: tran sient err\n")
212
213 // Error conditions when routing the task.
214
215 panicNow = true
216
217 rw = execute("not a json")
218 So(rw.Code, ShouldEqual, 202) // no retry!
219 So(rw.Body.String(), ShouldStartWith, "Bad payload, can' t deserialize")
220
221 rw = execute(`{"type":"google.protobuf.Duration"}`)
222 So(rw.Code, ShouldEqual, 202) // no retry!
223 So(rw.Body.String(), ShouldStartWith, "Bad payload, can' t deserialize")
224
225 rw = execute(`{"type":"google.protobuf.Duration","body": "blah"}`)
226 So(rw.Code, ShouldEqual, 202) // no retry!
227 So(rw.Body.String(), ShouldStartWith, "Bad payload, can' t deserialize")
228
229 rw = execute(`{"type":"unknown.proto.type","body":"{}"}` )
230 So(rw.Code, ShouldEqual, 202) // no retry!
231 So(rw.Body.String(), ShouldStartWith, "Bad payload, can' t deserialize")
232 })
233 })
234 }
OLDNEW
« no previous file with comments | « scheduler/appengine/engine/tq/tq.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698