Chromium Code Reviews| Index: scheduler/appengine/engine/tq/tq_test.go |
| diff --git a/scheduler/appengine/engine/tq/tq_test.go b/scheduler/appengine/engine/tq/tq_test.go |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..61f8adfdef389d84fb2b18b5b151d0f602c14137 |
| --- /dev/null |
| +++ b/scheduler/appengine/engine/tq/tq_test.go |
| @@ -0,0 +1,239 @@ |
| +// Copyright 2017 The LUCI Authors. |
| +// |
| +// Licensed under the Apache License, Version 2.0 (the "License"); |
| +// you may not use this file except in compliance with the License. |
| +// You may obtain a copy of the License at |
| +// |
| +// http://www.apache.org/licenses/LICENSE-2.0 |
| +// |
| +// Unless required by applicable law or agreed to in writing, software |
| +// distributed under the License is distributed on an "AS IS" BASIS, |
| +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| +// See the License for the specific language governing permissions and |
| +// limitations under the License. |
| + |
| +package tq |
| + |
| +import ( |
| + "bytes" |
| + "fmt" |
| + "net/http/httptest" |
| + "testing" |
| + "time" |
| + |
| + "golang.org/x/net/context" |
| + |
| + "github.com/golang/protobuf/proto" |
| + "github.com/golang/protobuf/ptypes/duration" |
| + "github.com/golang/protobuf/ptypes/empty" |
| + |
| + "github.com/luci/gae/impl/memory" |
| + "github.com/luci/gae/service/taskqueue" |
| + |
| + "github.com/luci/luci-go/common/clock" |
| + "github.com/luci/luci-go/common/clock/testclock" |
| + "github.com/luci/luci-go/common/retry/transient" |
| + "github.com/luci/luci-go/server/router" |
| + |
| + . "github.com/smartystreets/goconvey/convey" |
| +) |
| + |
| +var epoch = time.Unix(1500000000, 0).UTC() |
| + |
| +func TestDispatcher(t *testing.T) { |
| + t.Parallel() |
| + |
| + Convey("With dispatcher", t, func() { |
| + ctx := testingContext() |
|
tandrii(chromium)
2017/07/28 09:47:17
i'd move its body here so it's obvious that you ar
Vadim Sh.
2017/07/28 19:20:30
Done.
|
| + |
| + d := Dispatcher{} |
| + r := router.New() |
| + |
| + installRoutes := func() { |
| + d.InstallRoutes(r, router.NewMiddlewareChain(func(c *router.Context, next router.Handler) { |
| + c.Context = ctx |
| + next(c) |
| + })) |
| + } |
| + |
| + Convey("Single task", func() { |
| + var calls []proto.Message |
| + handler := func(c context.Context, payload proto.Message, execCount int) error { |
| + calls = append(calls, payload) |
| + return nil |
| + } |
| + |
| + // Abuse some well-known proto type to simplify the test. It's doesn't |
| + // matter what proto type we use here as long as it is registered in |
| + // protobuf type registry. |
| + d.RegisterTask(&duration.Duration{}, handler, "", nil) |
| + installRoutes() |
| + |
| + err := d.AddTask(ctx, &Task{ |
| + Payload: &duration.Duration{Seconds: 123}, |
| + DeduplicationKey: "abc", |
| + Title: "abc-def", |
| + Delay: 30 * time.Second, |
| + }) |
| + So(err, ShouldBeNil) |
| + |
| + // Added the task. |
| + expectedPath := "/internal/tasks/default/abc-def" |
| + expectedName := "afc6f8271b8598ee04e359916e6c584a9bc3c520a11dd5244e3399346ac0d3a7" |
| + expectedBody := []byte(`{"type":"google.protobuf.Duration","body":"123.000s"}`) |
| + tasks := taskqueue.GetTestable(ctx).GetScheduledTasks() |
| + So(tasks, ShouldResemble, taskqueue.QueueData{ |
| + "default": map[string]*taskqueue.Task{ |
| + expectedName: { |
| + Path: expectedPath, |
| + Payload: expectedBody, |
| + Name: expectedName, |
| + Method: "POST", |
| + ETA: epoch.Add(30 * time.Second), |
| + }, |
| + }, |
| + "another-q": {}, |
| + }) |
| + |
| + // Readd a task with same dedup key. Should be silently ignored. |
| + err = d.AddTask(ctx, &Task{ |
| + Payload: &duration.Duration{Seconds: 123}, |
| + DeduplicationKey: "abc", |
| + }) |
| + So(err, ShouldBeNil) |
| + |
| + // No new tasks. |
| + tasks = taskqueue.GetTestable(ctx).GetScheduledTasks() |
| + So(len(tasks["default"]), ShouldResemble, 1) |
| + |
| + // Execute the task. |
| + req := httptest.NewRequest("POST", "http://example.com"+expectedPath, bytes.NewReader(expectedBody)) |
| + rw := httptest.NewRecorder() |
| + r.ServeHTTP(rw, req) |
| + |
| + // Executed. |
| + So(calls, ShouldResemble, []proto.Message{ |
| + &duration.Duration{Seconds: 123}, |
| + }) |
| + So(rw.Code, ShouldEqual, 200) |
| + }) |
| + |
| + Convey("Many tasks", func() { |
| + handler := func(c context.Context, payload proto.Message, execCount int) error { return nil } |
| + d.RegisterTask(&duration.Duration{}, handler, "default", nil) |
| + d.RegisterTask(&empty.Empty{}, handler, "another-q", nil) |
| + installRoutes() |
| + |
| + t := []*Task{} |
| + for i := 0; i < 200; i++ { |
| + var task *Task |
| + |
| + if i%2 == 0 { |
| + task = &Task{ |
| + DeduplicationKey: fmt.Sprintf("%d", i/2), |
| + Payload: &duration.Duration{}, |
| + Delay: time.Duration(i) * time.Second, |
| + } |
| + } else { |
| + task = &Task{ |
| + DeduplicationKey: fmt.Sprintf("%d", (i-1)/2), |
| + Payload: &empty.Empty{}, |
| + Delay: time.Duration(i) * time.Second, |
| + } |
| + } |
| + |
| + t = append(t, task) |
| + |
| + // Mix in some duplicates. |
| + if i > 0 && i%100 == 0 { |
| + t = append(t, task) |
| + } |
| + } |
| + err := d.AddTask(ctx, t...) |
| + So(err, ShouldBeNil) |
| + |
| + // Added all the tasks. |
| + allTasks := taskqueue.GetTestable(ctx).GetScheduledTasks() |
| + delaysDefault := map[time.Duration]struct{}{} |
| + for _, task := range allTasks["default"] { |
| + delaysDefault[task.ETA.Sub(epoch)/time.Second] = struct{}{} |
| + } |
| + delaysAnotherQ := map[time.Duration]struct{}{} |
| + for _, task := range allTasks["another-q"] { |
| + delaysAnotherQ[task.ETA.Sub(epoch)/time.Second] = struct{}{} |
| + } |
| + So(len(delaysDefault), ShouldEqual, 100) |
| + So(len(delaysAnotherQ), ShouldEqual, 100) |
| + }) |
| + |
| + Convey("Execution errors", func() { |
| + var returnErr error |
| + panicNow := false |
| + handler := func(c context.Context, payload proto.Message, execCount int) error { |
| + if panicNow { |
| + panic("must not be called") |
| + } |
| + return returnErr |
| + } |
| + |
| + d.RegisterTask(&duration.Duration{}, handler, "", nil) |
| + installRoutes() |
| + |
| + goodBody := `{"type":"google.protobuf.Duration","body":"123.000s"}` |
| + |
| + execute := func(body string) *httptest.ResponseRecorder { |
| + req := httptest.NewRequest( |
| + "POST", |
| + "http://example.com/internal/tasks/default/abc-def", |
| + bytes.NewReader([]byte(body))) |
| + rw := httptest.NewRecorder() |
| + r.ServeHTTP(rw, req) |
| + return rw |
| + } |
| + |
| + // Error conditions inside the task body. |
| + |
| + returnErr = nil |
| + rw := execute(goodBody) |
| + So(rw.Code, ShouldEqual, 200) |
| + So(rw.Body.String(), ShouldEqual, "OK\n") |
| + |
| + returnErr = fmt.Errorf("fatal err") |
| + rw = execute(goodBody) |
| + So(rw.Code, ShouldEqual, 202) // no retry! |
| + So(rw.Body.String(), ShouldEqual, "Fatal error: fatal err\n") |
| + |
| + returnErr = transient.Tag.Apply(fmt.Errorf("transient err")) |
| + rw = execute(goodBody) |
| + So(rw.Code, ShouldEqual, 500) // 500 for retry |
| + So(rw.Body.String(), ShouldEqual, "Transient error: transient err\n") |
| + |
| + // Error conditions when routing the task. |
| + |
| + panicNow = true |
| + |
| + rw = execute("not a json") |
| + So(rw.Code, ShouldEqual, 202) // no retry! |
| + So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize") |
| + |
| + rw = execute(`{"type":"google.protobuf.Duration"}`) |
| + So(rw.Code, ShouldEqual, 202) // no retry! |
| + So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize") |
| + |
| + rw = execute(`{"type":"google.protobuf.Duration","body":"blah"}`) |
| + So(rw.Code, ShouldEqual, 202) // no retry! |
| + So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize") |
| + |
| + rw = execute(`{"type":"unknown.proto.type","body":"{}"}`) |
| + So(rw.Code, ShouldEqual, 202) // no retry! |
| + So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize") |
| + }) |
| + }) |
| +} |
| + |
| +func testingContext() context.Context { |
| + c := memory.Use(context.Background()) |
| + c = clock.Set(c, testclock.New(epoch)) |
| + taskqueue.GetTestable(c).CreateQueue("another-q") |
| + return c |
| +} |