Index: scheduler/appengine/engine/tq/tq_test.go |
diff --git a/scheduler/appengine/engine/tq/tq_test.go b/scheduler/appengine/engine/tq/tq_test.go |
new file mode 100644 |
index 0000000000000000000000000000000000000000..e3176d07a3c001f861b6c414fd49aff5f7115153 |
--- /dev/null |
+++ b/scheduler/appengine/engine/tq/tq_test.go |
@@ -0,0 +1,234 @@ |
+// Copyright 2017 The LUCI Authors. |
+// |
+// Licensed under the Apache License, Version 2.0 (the "License"); |
+// you may not use this file except in compliance with the License. |
+// You may obtain a copy of the License at |
+// |
+// http://www.apache.org/licenses/LICENSE-2.0 |
+// |
+// Unless required by applicable law or agreed to in writing, software |
+// distributed under the License is distributed on an "AS IS" BASIS, |
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
+// See the License for the specific language governing permissions and |
+// limitations under the License. |
+ |
+package tq |
+ |
+import ( |
+ "bytes" |
+ "fmt" |
+ "net/http/httptest" |
+ "testing" |
+ "time" |
+ |
+ "golang.org/x/net/context" |
+ |
+ "github.com/golang/protobuf/proto" |
+ "github.com/golang/protobuf/ptypes/duration" |
+ "github.com/golang/protobuf/ptypes/empty" |
+ |
+ "github.com/luci/gae/impl/memory" |
+ "github.com/luci/gae/service/taskqueue" |
+ |
+ "github.com/luci/luci-go/common/clock" |
+ "github.com/luci/luci-go/common/clock/testclock" |
+ "github.com/luci/luci-go/common/retry/transient" |
+ "github.com/luci/luci-go/server/router" |
+ |
+ . "github.com/smartystreets/goconvey/convey" |
+) |
+ |
+var epoch = time.Unix(1500000000, 0).UTC() |
+ |
+func TestDispatcher(t *testing.T) { |
+ t.Parallel() |
+ |
+ Convey("With dispatcher", t, func() { |
+ ctx := memory.Use(context.Background()) |
+ ctx = clock.Set(ctx, testclock.New(epoch)) |
+ taskqueue.GetTestable(ctx).CreateQueue("another-q") |
+ |
+ d := Dispatcher{} |
+ r := router.New() |
+ |
+ installRoutes := func() { |
+ d.InstallRoutes(r, router.NewMiddlewareChain(func(c *router.Context, next router.Handler) { |
+ c.Context = ctx |
+ next(c) |
+ })) |
+ } |
+ |
+ Convey("Single task", func() { |
+ var calls []proto.Message |
+ handler := func(c context.Context, payload proto.Message, execCount int) error { |
+ calls = append(calls, payload) |
+ return nil |
+ } |
+ |
+ // Abuse some well-known proto type to simplify the test. It's doesn't |
+ // matter what proto type we use here as long as it is registered in |
+ // protobuf type registry. |
+ d.RegisterTask(&duration.Duration{}, handler, "", nil) |
+ installRoutes() |
+ |
+ err := d.AddTask(ctx, &Task{ |
+ Payload: &duration.Duration{Seconds: 123}, |
+ DeduplicationKey: "abc", |
+ Title: "abc-def", |
+ Delay: 30 * time.Second, |
+ }) |
+ So(err, ShouldBeNil) |
+ |
+ // Added the task. |
+ expectedPath := "/internal/tasks/default/abc-def" |
+ expectedName := "afc6f8271b8598ee04e359916e6c584a9bc3c520a11dd5244e3399346ac0d3a7" |
+ expectedBody := []byte(`{"type":"google.protobuf.Duration","body":"123.000s"}`) |
+ tasks := taskqueue.GetTestable(ctx).GetScheduledTasks() |
+ So(tasks, ShouldResemble, taskqueue.QueueData{ |
+ "default": map[string]*taskqueue.Task{ |
+ expectedName: { |
+ Path: expectedPath, |
+ Payload: expectedBody, |
+ Name: expectedName, |
+ Method: "POST", |
+ ETA: epoch.Add(30 * time.Second), |
+ }, |
+ }, |
+ "another-q": {}, |
+ }) |
+ |
+ // Readd a task with same dedup key. Should be silently ignored. |
+ err = d.AddTask(ctx, &Task{ |
+ Payload: &duration.Duration{Seconds: 123}, |
+ DeduplicationKey: "abc", |
+ }) |
+ So(err, ShouldBeNil) |
+ |
+ // No new tasks. |
+ tasks = taskqueue.GetTestable(ctx).GetScheduledTasks() |
+ So(len(tasks["default"]), ShouldResemble, 1) |
+ |
+ // Execute the task. |
+ req := httptest.NewRequest("POST", "http://example.com"+expectedPath, bytes.NewReader(expectedBody)) |
+ rw := httptest.NewRecorder() |
+ r.ServeHTTP(rw, req) |
+ |
+ // Executed. |
+ So(calls, ShouldResemble, []proto.Message{ |
+ &duration.Duration{Seconds: 123}, |
+ }) |
+ So(rw.Code, ShouldEqual, 200) |
+ }) |
+ |
+ Convey("Many tasks", func() { |
+ handler := func(c context.Context, payload proto.Message, execCount int) error { return nil } |
+ d.RegisterTask(&duration.Duration{}, handler, "default", nil) |
+ d.RegisterTask(&empty.Empty{}, handler, "another-q", nil) |
+ installRoutes() |
+ |
+ t := []*Task{} |
+ for i := 0; i < 200; i++ { |
+ var task *Task |
+ |
+ if i%2 == 0 { |
+ task = &Task{ |
+ DeduplicationKey: fmt.Sprintf("%d", i/2), |
+ Payload: &duration.Duration{}, |
+ Delay: time.Duration(i) * time.Second, |
+ } |
+ } else { |
+ task = &Task{ |
+ DeduplicationKey: fmt.Sprintf("%d", (i-1)/2), |
+ Payload: &empty.Empty{}, |
+ Delay: time.Duration(i) * time.Second, |
+ } |
+ } |
+ |
+ t = append(t, task) |
+ |
+ // Mix in some duplicates. |
+ if i > 0 && i%100 == 0 { |
+ t = append(t, task) |
+ } |
+ } |
+ err := d.AddTask(ctx, t...) |
+ So(err, ShouldBeNil) |
+ |
+ // Added all the tasks. |
+ allTasks := taskqueue.GetTestable(ctx).GetScheduledTasks() |
+ delaysDefault := map[time.Duration]struct{}{} |
+ for _, task := range allTasks["default"] { |
+ delaysDefault[task.ETA.Sub(epoch)/time.Second] = struct{}{} |
+ } |
+ delaysAnotherQ := map[time.Duration]struct{}{} |
+ for _, task := range allTasks["another-q"] { |
+ delaysAnotherQ[task.ETA.Sub(epoch)/time.Second] = struct{}{} |
+ } |
+ So(len(delaysDefault), ShouldEqual, 100) |
+ So(len(delaysAnotherQ), ShouldEqual, 100) |
+ }) |
+ |
+ Convey("Execution errors", func() { |
+ var returnErr error |
+ panicNow := false |
+ handler := func(c context.Context, payload proto.Message, execCount int) error { |
+ if panicNow { |
+ panic("must not be called") |
+ } |
+ return returnErr |
+ } |
+ |
+ d.RegisterTask(&duration.Duration{}, handler, "", nil) |
+ installRoutes() |
+ |
+ goodBody := `{"type":"google.protobuf.Duration","body":"123.000s"}` |
+ |
+ execute := func(body string) *httptest.ResponseRecorder { |
+ req := httptest.NewRequest( |
+ "POST", |
+ "http://example.com/internal/tasks/default/abc-def", |
+ bytes.NewReader([]byte(body))) |
+ rw := httptest.NewRecorder() |
+ r.ServeHTTP(rw, req) |
+ return rw |
+ } |
+ |
+ // Error conditions inside the task body. |
+ |
+ returnErr = nil |
+ rw := execute(goodBody) |
+ So(rw.Code, ShouldEqual, 200) |
+ So(rw.Body.String(), ShouldEqual, "OK\n") |
+ |
+ returnErr = fmt.Errorf("fatal err") |
+ rw = execute(goodBody) |
+ So(rw.Code, ShouldEqual, 202) // no retry! |
+ So(rw.Body.String(), ShouldEqual, "Fatal error: fatal err\n") |
+ |
+ returnErr = transient.Tag.Apply(fmt.Errorf("transient err")) |
+ rw = execute(goodBody) |
+ So(rw.Code, ShouldEqual, 500) // 500 for retry |
+ So(rw.Body.String(), ShouldEqual, "Transient error: transient err\n") |
+ |
+ // Error conditions when routing the task. |
+ |
+ panicNow = true |
+ |
+ rw = execute("not a json") |
+ So(rw.Code, ShouldEqual, 202) // no retry! |
+ So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize") |
+ |
+ rw = execute(`{"type":"google.protobuf.Duration"}`) |
+ So(rw.Code, ShouldEqual, 202) // no retry! |
+ So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize") |
+ |
+ rw = execute(`{"type":"google.protobuf.Duration","body":"blah"}`) |
+ So(rw.Code, ShouldEqual, 202) // no retry! |
+ So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize") |
+ |
+ rw = execute(`{"type":"unknown.proto.type","body":"{}"}`) |
+ So(rw.Code, ShouldEqual, 202) // no retry! |
+ So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize") |
+ }) |
+ }) |
+} |