OLD | NEW |
(Empty) | |
| 1 // Copyright 2017 The LUCI Authors. |
| 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at |
| 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 // See the License for the specific language governing permissions and |
| 13 // limitations under the License. |
| 14 |
| 15 package tq |
| 16 |
| 17 import ( |
| 18 "bytes" |
| 19 "fmt" |
| 20 "net/http/httptest" |
| 21 "testing" |
| 22 "time" |
| 23 |
| 24 "golang.org/x/net/context" |
| 25 |
| 26 "github.com/golang/protobuf/proto" |
| 27 "github.com/golang/protobuf/ptypes/duration" |
| 28 "github.com/golang/protobuf/ptypes/empty" |
| 29 |
| 30 "github.com/luci/gae/impl/memory" |
| 31 "github.com/luci/gae/service/taskqueue" |
| 32 |
| 33 "github.com/luci/luci-go/common/clock" |
| 34 "github.com/luci/luci-go/common/clock/testclock" |
| 35 "github.com/luci/luci-go/common/retry/transient" |
| 36 "github.com/luci/luci-go/server/router" |
| 37 |
| 38 . "github.com/smartystreets/goconvey/convey" |
| 39 ) |
| 40 |
| 41 var epoch = time.Unix(1500000000, 0).UTC() |
| 42 |
| 43 func TestDispatcher(t *testing.T) { |
| 44 t.Parallel() |
| 45 |
| 46 Convey("With dispatcher", t, func() { |
| 47 ctx := memory.Use(context.Background()) |
| 48 ctx = clock.Set(ctx, testclock.New(epoch)) |
| 49 taskqueue.GetTestable(ctx).CreateQueue("another-q") |
| 50 |
| 51 d := Dispatcher{} |
| 52 r := router.New() |
| 53 |
| 54 installRoutes := func() { |
| 55 d.InstallRoutes(r, router.NewMiddlewareChain(func(c *rou
ter.Context, next router.Handler) { |
| 56 c.Context = ctx |
| 57 next(c) |
| 58 })) |
| 59 } |
| 60 |
| 61 Convey("Single task", func() { |
| 62 var calls []proto.Message |
| 63 handler := func(c context.Context, payload proto.Message
, execCount int) error { |
| 64 calls = append(calls, payload) |
| 65 return nil |
| 66 } |
| 67 |
| 68 // Abuse some well-known proto type to simplify the test
. It's doesn't |
| 69 // matter what proto type we use here as long as it is r
egistered in |
| 70 // protobuf type registry. |
| 71 d.RegisterTask(&duration.Duration{}, handler, "", nil) |
| 72 installRoutes() |
| 73 |
| 74 err := d.AddTask(ctx, &Task{ |
| 75 Payload: &duration.Duration{Seconds: 12
3}, |
| 76 DeduplicationKey: "abc", |
| 77 Title: "abc-def", |
| 78 Delay: 30 * time.Second, |
| 79 }) |
| 80 So(err, ShouldBeNil) |
| 81 |
| 82 // Added the task. |
| 83 expectedPath := "/internal/tasks/default/abc-def" |
| 84 expectedName := "afc6f8271b8598ee04e359916e6c584a9bc3c52
0a11dd5244e3399346ac0d3a7" |
| 85 expectedBody := []byte(`{"type":"google.protobuf.Duratio
n","body":"123.000s"}`) |
| 86 tasks := taskqueue.GetTestable(ctx).GetScheduledTasks() |
| 87 So(tasks, ShouldResemble, taskqueue.QueueData{ |
| 88 "default": map[string]*taskqueue.Task{ |
| 89 expectedName: { |
| 90 Path: expectedPath, |
| 91 Payload: expectedBody, |
| 92 Name: expectedName, |
| 93 Method: "POST", |
| 94 ETA: epoch.Add(30 * time.Sec
ond), |
| 95 }, |
| 96 }, |
| 97 "another-q": {}, |
| 98 }) |
| 99 |
| 100 // Readd a task with same dedup key. Should be silently
ignored. |
| 101 err = d.AddTask(ctx, &Task{ |
| 102 Payload: &duration.Duration{Seconds: 12
3}, |
| 103 DeduplicationKey: "abc", |
| 104 }) |
| 105 So(err, ShouldBeNil) |
| 106 |
| 107 // No new tasks. |
| 108 tasks = taskqueue.GetTestable(ctx).GetScheduledTasks() |
| 109 So(len(tasks["default"]), ShouldResemble, 1) |
| 110 |
| 111 // Execute the task. |
| 112 req := httptest.NewRequest("POST", "http://example.com"+
expectedPath, bytes.NewReader(expectedBody)) |
| 113 rw := httptest.NewRecorder() |
| 114 r.ServeHTTP(rw, req) |
| 115 |
| 116 // Executed. |
| 117 So(calls, ShouldResemble, []proto.Message{ |
| 118 &duration.Duration{Seconds: 123}, |
| 119 }) |
| 120 So(rw.Code, ShouldEqual, 200) |
| 121 }) |
| 122 |
| 123 Convey("Many tasks", func() { |
| 124 handler := func(c context.Context, payload proto.Message
, execCount int) error { return nil } |
| 125 d.RegisterTask(&duration.Duration{}, handler, "default",
nil) |
| 126 d.RegisterTask(&empty.Empty{}, handler, "another-q", nil
) |
| 127 installRoutes() |
| 128 |
| 129 t := []*Task{} |
| 130 for i := 0; i < 200; i++ { |
| 131 var task *Task |
| 132 |
| 133 if i%2 == 0 { |
| 134 task = &Task{ |
| 135 DeduplicationKey: fmt.Sprintf("%
d", i/2), |
| 136 Payload: &duration.Dura
tion{}, |
| 137 Delay: time.Duration(
i) * time.Second, |
| 138 } |
| 139 } else { |
| 140 task = &Task{ |
| 141 DeduplicationKey: fmt.Sprintf("%
d", (i-1)/2), |
| 142 Payload: &empty.Empty{}
, |
| 143 Delay: time.Duration(
i) * time.Second, |
| 144 } |
| 145 } |
| 146 |
| 147 t = append(t, task) |
| 148 |
| 149 // Mix in some duplicates. |
| 150 if i > 0 && i%100 == 0 { |
| 151 t = append(t, task) |
| 152 } |
| 153 } |
| 154 err := d.AddTask(ctx, t...) |
| 155 So(err, ShouldBeNil) |
| 156 |
| 157 // Added all the tasks. |
| 158 allTasks := taskqueue.GetTestable(ctx).GetScheduledTasks
() |
| 159 delaysDefault := map[time.Duration]struct{}{} |
| 160 for _, task := range allTasks["default"] { |
| 161 delaysDefault[task.ETA.Sub(epoch)/time.Second] =
struct{}{} |
| 162 } |
| 163 delaysAnotherQ := map[time.Duration]struct{}{} |
| 164 for _, task := range allTasks["another-q"] { |
| 165 delaysAnotherQ[task.ETA.Sub(epoch)/time.Second]
= struct{}{} |
| 166 } |
| 167 So(len(delaysDefault), ShouldEqual, 100) |
| 168 So(len(delaysAnotherQ), ShouldEqual, 100) |
| 169 }) |
| 170 |
| 171 Convey("Execution errors", func() { |
| 172 var returnErr error |
| 173 panicNow := false |
| 174 handler := func(c context.Context, payload proto.Message
, execCount int) error { |
| 175 if panicNow { |
| 176 panic("must not be called") |
| 177 } |
| 178 return returnErr |
| 179 } |
| 180 |
| 181 d.RegisterTask(&duration.Duration{}, handler, "", nil) |
| 182 installRoutes() |
| 183 |
| 184 goodBody := `{"type":"google.protobuf.Duration","body":"
123.000s"}` |
| 185 |
| 186 execute := func(body string) *httptest.ResponseRecorder
{ |
| 187 req := httptest.NewRequest( |
| 188 "POST", |
| 189 "http://example.com/internal/tasks/defau
lt/abc-def", |
| 190 bytes.NewReader([]byte(body))) |
| 191 rw := httptest.NewRecorder() |
| 192 r.ServeHTTP(rw, req) |
| 193 return rw |
| 194 } |
| 195 |
| 196 // Error conditions inside the task body. |
| 197 |
| 198 returnErr = nil |
| 199 rw := execute(goodBody) |
| 200 So(rw.Code, ShouldEqual, 200) |
| 201 So(rw.Body.String(), ShouldEqual, "OK\n") |
| 202 |
| 203 returnErr = fmt.Errorf("fatal err") |
| 204 rw = execute(goodBody) |
| 205 So(rw.Code, ShouldEqual, 202) // no retry! |
| 206 So(rw.Body.String(), ShouldEqual, "Fatal error: fatal er
r\n") |
| 207 |
| 208 returnErr = transient.Tag.Apply(fmt.Errorf("transient er
r")) |
| 209 rw = execute(goodBody) |
| 210 So(rw.Code, ShouldEqual, 500) // 500 for retry |
| 211 So(rw.Body.String(), ShouldEqual, "Transient error: tran
sient err\n") |
| 212 |
| 213 // Error conditions when routing the task. |
| 214 |
| 215 panicNow = true |
| 216 |
| 217 rw = execute("not a json") |
| 218 So(rw.Code, ShouldEqual, 202) // no retry! |
| 219 So(rw.Body.String(), ShouldStartWith, "Bad payload, can'
t deserialize") |
| 220 |
| 221 rw = execute(`{"type":"google.protobuf.Duration"}`) |
| 222 So(rw.Code, ShouldEqual, 202) // no retry! |
| 223 So(rw.Body.String(), ShouldStartWith, "Bad payload, can'
t deserialize") |
| 224 |
| 225 rw = execute(`{"type":"google.protobuf.Duration","body":
"blah"}`) |
| 226 So(rw.Code, ShouldEqual, 202) // no retry! |
| 227 So(rw.Body.String(), ShouldStartWith, "Bad payload, can'
t deserialize") |
| 228 |
| 229 rw = execute(`{"type":"unknown.proto.type","body":"{}"}`
) |
| 230 So(rw.Code, ShouldEqual, 202) // no retry! |
| 231 So(rw.Body.String(), ShouldStartWith, "Bad payload, can'
t deserialize") |
| 232 }) |
| 233 }) |
| 234 } |
OLD | NEW |