| OLD | NEW |
| (Empty) |
| 1 // Copyright 2017 The LUCI Authors. | |
| 2 // | |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | |
| 4 // you may not use this file except in compliance with the License. | |
| 5 // You may obtain a copy of the License at | |
| 6 // | |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | |
| 8 // | |
| 9 // Unless required by applicable law or agreed to in writing, software | |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 12 // See the License for the specific language governing permissions and | |
| 13 // limitations under the License. | |
| 14 | |
| 15 package tq | |
| 16 | |
| 17 import ( | |
| 18 "bytes" | |
| 19 "fmt" | |
| 20 "net/http/httptest" | |
| 21 "testing" | |
| 22 "time" | |
| 23 | |
| 24 "golang.org/x/net/context" | |
| 25 | |
| 26 "github.com/golang/protobuf/proto" | |
| 27 "github.com/golang/protobuf/ptypes/duration" | |
| 28 "github.com/golang/protobuf/ptypes/empty" | |
| 29 | |
| 30 "github.com/luci/gae/impl/memory" | |
| 31 "github.com/luci/gae/service/taskqueue" | |
| 32 | |
| 33 "github.com/luci/luci-go/common/clock" | |
| 34 "github.com/luci/luci-go/common/clock/testclock" | |
| 35 "github.com/luci/luci-go/common/retry/transient" | |
| 36 "github.com/luci/luci-go/server/router" | |
| 37 | |
| 38 . "github.com/smartystreets/goconvey/convey" | |
| 39 ) | |
| 40 | |
| 41 var epoch = time.Unix(1500000000, 0).UTC() | |
| 42 | |
| 43 func TestDispatcher(t *testing.T) { | |
| 44 t.Parallel() | |
| 45 | |
| 46 Convey("With dispatcher", t, func() { | |
| 47 ctx := memory.Use(context.Background()) | |
| 48 ctx = clock.Set(ctx, testclock.New(epoch)) | |
| 49 taskqueue.GetTestable(ctx).CreateQueue("another-q") | |
| 50 | |
| 51 d := Dispatcher{} | |
| 52 r := router.New() | |
| 53 | |
| 54 installRoutes := func() { | |
| 55 d.InstallRoutes(r, router.NewMiddlewareChain(func(c *rou
ter.Context, next router.Handler) { | |
| 56 c.Context = ctx | |
| 57 next(c) | |
| 58 })) | |
| 59 } | |
| 60 | |
| 61 Convey("Single task", func() { | |
| 62 var calls []proto.Message | |
| 63 handler := func(c context.Context, payload proto.Message
, execCount int) error { | |
| 64 calls = append(calls, payload) | |
| 65 return nil | |
| 66 } | |
| 67 | |
| 68 // Abuse some well-known proto type to simplify the test
. It's doesn't | |
| 69 // matter what proto type we use here as long as it is r
egistered in | |
| 70 // protobuf type registry. | |
| 71 d.RegisterTask(&duration.Duration{}, handler, "", nil) | |
| 72 installRoutes() | |
| 73 | |
| 74 err := d.AddTask(ctx, &Task{ | |
| 75 Payload: &duration.Duration{Seconds: 12
3}, | |
| 76 DeduplicationKey: "abc", | |
| 77 Title: "abc-def", | |
| 78 Delay: 30 * time.Second, | |
| 79 }) | |
| 80 So(err, ShouldBeNil) | |
| 81 | |
| 82 // Added the task. | |
| 83 expectedPath := "/internal/tasks/default/abc-def" | |
| 84 expectedName := "afc6f8271b8598ee04e359916e6c584a9bc3c52
0a11dd5244e3399346ac0d3a7" | |
| 85 expectedBody := []byte(`{"type":"google.protobuf.Duratio
n","body":"123.000s"}`) | |
| 86 tasks := taskqueue.GetTestable(ctx).GetScheduledTasks() | |
| 87 So(tasks, ShouldResemble, taskqueue.QueueData{ | |
| 88 "default": map[string]*taskqueue.Task{ | |
| 89 expectedName: { | |
| 90 Path: expectedPath, | |
| 91 Payload: expectedBody, | |
| 92 Name: expectedName, | |
| 93 Method: "POST", | |
| 94 ETA: epoch.Add(30 * time.Sec
ond), | |
| 95 }, | |
| 96 }, | |
| 97 "another-q": {}, | |
| 98 }) | |
| 99 | |
| 100 // Readd a task with same dedup key. Should be silently
ignored. | |
| 101 err = d.AddTask(ctx, &Task{ | |
| 102 Payload: &duration.Duration{Seconds: 12
3}, | |
| 103 DeduplicationKey: "abc", | |
| 104 }) | |
| 105 So(err, ShouldBeNil) | |
| 106 | |
| 107 // No new tasks. | |
| 108 tasks = taskqueue.GetTestable(ctx).GetScheduledTasks() | |
| 109 So(len(tasks["default"]), ShouldResemble, 1) | |
| 110 | |
| 111 // Execute the task. | |
| 112 req := httptest.NewRequest("POST", "http://example.com"+
expectedPath, bytes.NewReader(expectedBody)) | |
| 113 rw := httptest.NewRecorder() | |
| 114 r.ServeHTTP(rw, req) | |
| 115 | |
| 116 // Executed. | |
| 117 So(calls, ShouldResemble, []proto.Message{ | |
| 118 &duration.Duration{Seconds: 123}, | |
| 119 }) | |
| 120 So(rw.Code, ShouldEqual, 200) | |
| 121 }) | |
| 122 | |
| 123 Convey("Many tasks", func() { | |
| 124 handler := func(c context.Context, payload proto.Message
, execCount int) error { return nil } | |
| 125 d.RegisterTask(&duration.Duration{}, handler, "default",
nil) | |
| 126 d.RegisterTask(&empty.Empty{}, handler, "another-q", nil
) | |
| 127 installRoutes() | |
| 128 | |
| 129 t := []*Task{} | |
| 130 for i := 0; i < 200; i++ { | |
| 131 var task *Task | |
| 132 | |
| 133 if i%2 == 0 { | |
| 134 task = &Task{ | |
| 135 DeduplicationKey: fmt.Sprintf("%
d", i/2), | |
| 136 Payload: &duration.Dura
tion{}, | |
| 137 Delay: time.Duration(
i) * time.Second, | |
| 138 } | |
| 139 } else { | |
| 140 task = &Task{ | |
| 141 DeduplicationKey: fmt.Sprintf("%
d", (i-1)/2), | |
| 142 Payload: &empty.Empty{}
, | |
| 143 Delay: time.Duration(
i) * time.Second, | |
| 144 } | |
| 145 } | |
| 146 | |
| 147 t = append(t, task) | |
| 148 | |
| 149 // Mix in some duplicates. | |
| 150 if i > 0 && i%100 == 0 { | |
| 151 t = append(t, task) | |
| 152 } | |
| 153 } | |
| 154 err := d.AddTask(ctx, t...) | |
| 155 So(err, ShouldBeNil) | |
| 156 | |
| 157 // Added all the tasks. | |
| 158 allTasks := taskqueue.GetTestable(ctx).GetScheduledTasks
() | |
| 159 delaysDefault := map[time.Duration]struct{}{} | |
| 160 for _, task := range allTasks["default"] { | |
| 161 delaysDefault[task.ETA.Sub(epoch)/time.Second] =
struct{}{} | |
| 162 } | |
| 163 delaysAnotherQ := map[time.Duration]struct{}{} | |
| 164 for _, task := range allTasks["another-q"] { | |
| 165 delaysAnotherQ[task.ETA.Sub(epoch)/time.Second]
= struct{}{} | |
| 166 } | |
| 167 So(len(delaysDefault), ShouldEqual, 100) | |
| 168 So(len(delaysAnotherQ), ShouldEqual, 100) | |
| 169 }) | |
| 170 | |
| 171 Convey("Execution errors", func() { | |
| 172 var returnErr error | |
| 173 panicNow := false | |
| 174 handler := func(c context.Context, payload proto.Message
, execCount int) error { | |
| 175 if panicNow { | |
| 176 panic("must not be called") | |
| 177 } | |
| 178 return returnErr | |
| 179 } | |
| 180 | |
| 181 d.RegisterTask(&duration.Duration{}, handler, "", nil) | |
| 182 installRoutes() | |
| 183 | |
| 184 goodBody := `{"type":"google.protobuf.Duration","body":"
123.000s"}` | |
| 185 | |
| 186 execute := func(body string) *httptest.ResponseRecorder
{ | |
| 187 req := httptest.NewRequest( | |
| 188 "POST", | |
| 189 "http://example.com/internal/tasks/defau
lt/abc-def", | |
| 190 bytes.NewReader([]byte(body))) | |
| 191 rw := httptest.NewRecorder() | |
| 192 r.ServeHTTP(rw, req) | |
| 193 return rw | |
| 194 } | |
| 195 | |
| 196 // Error conditions inside the task body. | |
| 197 | |
| 198 returnErr = nil | |
| 199 rw := execute(goodBody) | |
| 200 So(rw.Code, ShouldEqual, 200) | |
| 201 So(rw.Body.String(), ShouldEqual, "OK\n") | |
| 202 | |
| 203 returnErr = fmt.Errorf("fatal err") | |
| 204 rw = execute(goodBody) | |
| 205 So(rw.Code, ShouldEqual, 202) // no retry! | |
| 206 So(rw.Body.String(), ShouldEqual, "Fatal error: fatal er
r\n") | |
| 207 | |
| 208 returnErr = transient.Tag.Apply(fmt.Errorf("transient er
r")) | |
| 209 rw = execute(goodBody) | |
| 210 So(rw.Code, ShouldEqual, 500) // 500 for retry | |
| 211 So(rw.Body.String(), ShouldEqual, "Transient error: tran
sient err\n") | |
| 212 | |
| 213 // Error conditions when routing the task. | |
| 214 | |
| 215 panicNow = true | |
| 216 | |
| 217 rw = execute("not a json") | |
| 218 So(rw.Code, ShouldEqual, 202) // no retry! | |
| 219 So(rw.Body.String(), ShouldStartWith, "Bad payload, can'
t deserialize") | |
| 220 | |
| 221 rw = execute(`{"type":"google.protobuf.Duration"}`) | |
| 222 So(rw.Code, ShouldEqual, 202) // no retry! | |
| 223 So(rw.Body.String(), ShouldStartWith, "Bad payload, can'
t deserialize") | |
| 224 | |
| 225 rw = execute(`{"type":"google.protobuf.Duration","body":
"blah"}`) | |
| 226 So(rw.Code, ShouldEqual, 202) // no retry! | |
| 227 So(rw.Body.String(), ShouldStartWith, "Bad payload, can'
t deserialize") | |
| 228 | |
| 229 rw = execute(`{"type":"unknown.proto.type","body":"{}"}`
) | |
| 230 So(rw.Code, ShouldEqual, 202) // no retry! | |
| 231 So(rw.Body.String(), ShouldStartWith, "Bad payload, can'
t deserialize") | |
| 232 }) | |
| 233 }) | |
| 234 } | |
| OLD | NEW |