Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright 2017 The LUCI Authors. | |
| 2 // | |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | |
| 4 // you may not use this file except in compliance with the License. | |
| 5 // You may obtain a copy of the License at | |
| 6 // | |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | |
| 8 // | |
| 9 // Unless required by applicable law or agreed to in writing, software | |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 12 // See the License for the specific language governing permissions and | |
| 13 // limitations under the License. | |
| 14 | |
| 15 package tq | |
| 16 | |
| 17 import ( | |
| 18 "bytes" | |
| 19 "fmt" | |
| 20 "net/http/httptest" | |
| 21 "testing" | |
| 22 "time" | |
| 23 | |
| 24 "golang.org/x/net/context" | |
| 25 | |
| 26 "github.com/golang/protobuf/proto" | |
| 27 "github.com/golang/protobuf/ptypes/duration" | |
| 28 "github.com/golang/protobuf/ptypes/empty" | |
| 29 | |
| 30 "github.com/luci/gae/impl/memory" | |
| 31 "github.com/luci/gae/service/taskqueue" | |
| 32 | |
| 33 "github.com/luci/luci-go/common/clock" | |
| 34 "github.com/luci/luci-go/common/clock/testclock" | |
| 35 "github.com/luci/luci-go/common/retry/transient" | |
| 36 "github.com/luci/luci-go/server/router" | |
| 37 | |
| 38 . "github.com/smartystreets/goconvey/convey" | |
| 39 ) | |
| 40 | |
| 41 var epoch = time.Unix(1500000000, 0).UTC() | |
| 42 | |
| 43 func TestDispatcher(t *testing.T) { | |
| 44 t.Parallel() | |
| 45 | |
| 46 Convey("With dispatcher", t, func() { | |
| 47 ctx := testingContext() | |
|
tandrii(chromium)
2017/07/28 09:47:17
i'd move its body here so it's obvious that you ar
Vadim Sh.
2017/07/28 19:20:30
Done.
| |
| 48 | |
| 49 d := Dispatcher{} | |
| 50 r := router.New() | |
| 51 | |
| 52 installRoutes := func() { | |
| 53 d.InstallRoutes(r, router.NewMiddlewareChain(func(c *rou ter.Context, next router.Handler) { | |
| 54 c.Context = ctx | |
| 55 next(c) | |
| 56 })) | |
| 57 } | |
| 58 | |
| 59 Convey("Single task", func() { | |
| 60 var calls []proto.Message | |
| 61 handler := func(c context.Context, payload proto.Message , execCount int) error { | |
| 62 calls = append(calls, payload) | |
| 63 return nil | |
| 64 } | |
| 65 | |
| 66 // Abuse some well-known proto type to simplify the test . It's doesn't | |
| 67 // matter what proto type we use here as long as it is r egistered in | |
| 68 // protobuf type registry. | |
| 69 d.RegisterTask(&duration.Duration{}, handler, "", nil) | |
| 70 installRoutes() | |
| 71 | |
| 72 err := d.AddTask(ctx, &Task{ | |
| 73 Payload: &duration.Duration{Seconds: 12 3}, | |
| 74 DeduplicationKey: "abc", | |
| 75 Title: "abc-def", | |
| 76 Delay: 30 * time.Second, | |
| 77 }) | |
| 78 So(err, ShouldBeNil) | |
| 79 | |
| 80 // Added the task. | |
| 81 expectedPath := "/internal/tasks/default/abc-def" | |
| 82 expectedName := "afc6f8271b8598ee04e359916e6c584a9bc3c52 0a11dd5244e3399346ac0d3a7" | |
| 83 expectedBody := []byte(`{"type":"google.protobuf.Duratio n","body":"123.000s"}`) | |
| 84 tasks := taskqueue.GetTestable(ctx).GetScheduledTasks() | |
| 85 So(tasks, ShouldResemble, taskqueue.QueueData{ | |
| 86 "default": map[string]*taskqueue.Task{ | |
| 87 expectedName: { | |
| 88 Path: expectedPath, | |
| 89 Payload: expectedBody, | |
| 90 Name: expectedName, | |
| 91 Method: "POST", | |
| 92 ETA: epoch.Add(30 * time.Sec ond), | |
| 93 }, | |
| 94 }, | |
| 95 "another-q": {}, | |
| 96 }) | |
| 97 | |
| 98 // Readd a task with same dedup key. Should be silently ignored. | |
| 99 err = d.AddTask(ctx, &Task{ | |
| 100 Payload: &duration.Duration{Seconds: 12 3}, | |
| 101 DeduplicationKey: "abc", | |
| 102 }) | |
| 103 So(err, ShouldBeNil) | |
| 104 | |
| 105 // No new tasks. | |
| 106 tasks = taskqueue.GetTestable(ctx).GetScheduledTasks() | |
| 107 So(len(tasks["default"]), ShouldResemble, 1) | |
| 108 | |
| 109 // Execute the task. | |
| 110 req := httptest.NewRequest("POST", "http://example.com"+ expectedPath, bytes.NewReader(expectedBody)) | |
| 111 rw := httptest.NewRecorder() | |
| 112 r.ServeHTTP(rw, req) | |
| 113 | |
| 114 // Executed. | |
| 115 So(calls, ShouldResemble, []proto.Message{ | |
| 116 &duration.Duration{Seconds: 123}, | |
| 117 }) | |
| 118 So(rw.Code, ShouldEqual, 200) | |
| 119 }) | |
| 120 | |
| 121 Convey("Many tasks", func() { | |
| 122 handler := func(c context.Context, payload proto.Message , execCount int) error { return nil } | |
| 123 d.RegisterTask(&duration.Duration{}, handler, "default", nil) | |
| 124 d.RegisterTask(&empty.Empty{}, handler, "another-q", nil ) | |
| 125 installRoutes() | |
| 126 | |
| 127 t := []*Task{} | |
| 128 for i := 0; i < 200; i++ { | |
| 129 var task *Task | |
| 130 | |
| 131 if i%2 == 0 { | |
| 132 task = &Task{ | |
| 133 DeduplicationKey: fmt.Sprintf("% d", i/2), | |
| 134 Payload: &duration.Dura tion{}, | |
| 135 Delay: time.Duration( i) * time.Second, | |
| 136 } | |
| 137 } else { | |
| 138 task = &Task{ | |
| 139 DeduplicationKey: fmt.Sprintf("% d", (i-1)/2), | |
| 140 Payload: &empty.Empty{} , | |
| 141 Delay: time.Duration( i) * time.Second, | |
| 142 } | |
| 143 } | |
| 144 | |
| 145 t = append(t, task) | |
| 146 | |
| 147 // Mix in some duplicates. | |
| 148 if i > 0 && i%100 == 0 { | |
| 149 t = append(t, task) | |
| 150 } | |
| 151 } | |
| 152 err := d.AddTask(ctx, t...) | |
| 153 So(err, ShouldBeNil) | |
| 154 | |
| 155 // Added all the tasks. | |
| 156 allTasks := taskqueue.GetTestable(ctx).GetScheduledTasks () | |
| 157 delaysDefault := map[time.Duration]struct{}{} | |
| 158 for _, task := range allTasks["default"] { | |
| 159 delaysDefault[task.ETA.Sub(epoch)/time.Second] = struct{}{} | |
| 160 } | |
| 161 delaysAnotherQ := map[time.Duration]struct{}{} | |
| 162 for _, task := range allTasks["another-q"] { | |
| 163 delaysAnotherQ[task.ETA.Sub(epoch)/time.Second] = struct{}{} | |
| 164 } | |
| 165 So(len(delaysDefault), ShouldEqual, 100) | |
| 166 So(len(delaysAnotherQ), ShouldEqual, 100) | |
| 167 }) | |
| 168 | |
| 169 Convey("Execution errors", func() { | |
| 170 var returnErr error | |
| 171 panicNow := false | |
| 172 handler := func(c context.Context, payload proto.Message , execCount int) error { | |
| 173 if panicNow { | |
| 174 panic("must not be called") | |
| 175 } | |
| 176 return returnErr | |
| 177 } | |
| 178 | |
| 179 d.RegisterTask(&duration.Duration{}, handler, "", nil) | |
| 180 installRoutes() | |
| 181 | |
| 182 goodBody := `{"type":"google.protobuf.Duration","body":" 123.000s"}` | |
| 183 | |
| 184 execute := func(body string) *httptest.ResponseRecorder { | |
| 185 req := httptest.NewRequest( | |
| 186 "POST", | |
| 187 "http://example.com/internal/tasks/defau lt/abc-def", | |
| 188 bytes.NewReader([]byte(body))) | |
| 189 rw := httptest.NewRecorder() | |
| 190 r.ServeHTTP(rw, req) | |
| 191 return rw | |
| 192 } | |
| 193 | |
| 194 // Error conditions inside the task body. | |
| 195 | |
| 196 returnErr = nil | |
| 197 rw := execute(goodBody) | |
| 198 So(rw.Code, ShouldEqual, 200) | |
| 199 So(rw.Body.String(), ShouldEqual, "OK\n") | |
| 200 | |
| 201 returnErr = fmt.Errorf("fatal err") | |
| 202 rw = execute(goodBody) | |
| 203 So(rw.Code, ShouldEqual, 202) // no retry! | |
| 204 So(rw.Body.String(), ShouldEqual, "Fatal error: fatal er r\n") | |
| 205 | |
| 206 returnErr = transient.Tag.Apply(fmt.Errorf("transient er r")) | |
| 207 rw = execute(goodBody) | |
| 208 So(rw.Code, ShouldEqual, 500) // 500 for retry | |
| 209 So(rw.Body.String(), ShouldEqual, "Transient error: tran sient err\n") | |
| 210 | |
| 211 // Error conditions when routing the task. | |
| 212 | |
| 213 panicNow = true | |
| 214 | |
| 215 rw = execute("not a json") | |
| 216 So(rw.Code, ShouldEqual, 202) // no retry! | |
| 217 So(rw.Body.String(), ShouldStartWith, "Bad payload, can' t deserialize") | |
| 218 | |
| 219 rw = execute(`{"type":"google.protobuf.Duration"}`) | |
| 220 So(rw.Code, ShouldEqual, 202) // no retry! | |
| 221 So(rw.Body.String(), ShouldStartWith, "Bad payload, can' t deserialize") | |
| 222 | |
| 223 rw = execute(`{"type":"google.protobuf.Duration","body": "blah"}`) | |
| 224 So(rw.Code, ShouldEqual, 202) // no retry! | |
| 225 So(rw.Body.String(), ShouldStartWith, "Bad payload, can' t deserialize") | |
| 226 | |
| 227 rw = execute(`{"type":"unknown.proto.type","body":"{}"}` ) | |
| 228 So(rw.Code, ShouldEqual, 202) // no retry! | |
| 229 So(rw.Body.String(), ShouldStartWith, "Bad payload, can' t deserialize") | |
| 230 }) | |
| 231 }) | |
| 232 } | |
| 233 | |
| 234 func testingContext() context.Context { | |
| 235 c := memory.Use(context.Background()) | |
| 236 c = clock.Set(c, testclock.New(epoch)) | |
| 237 taskqueue.GetTestable(c).CreateQueue("another-q") | |
| 238 return c | |
| 239 } | |
| OLD | NEW |