Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(87)

Unified Diff: scheduler/appengine/engine/tq/tq_test.go

Issue 2981043002: Add a task queue task router to reduce amount of boilerplate. (Closed)
Patch Set: nits Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « scheduler/appengine/engine/tq/tq.go ('k') | no next file » | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: scheduler/appengine/engine/tq/tq_test.go
diff --git a/scheduler/appengine/engine/tq/tq_test.go b/scheduler/appengine/engine/tq/tq_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..e3176d07a3c001f861b6c414fd49aff5f7115153
--- /dev/null
+++ b/scheduler/appengine/engine/tq/tq_test.go
@@ -0,0 +1,234 @@
+// Copyright 2017 The LUCI Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tq
+
+import (
+ "bytes"
+ "fmt"
+ "net/http/httptest"
+ "testing"
+ "time"
+
+ "golang.org/x/net/context"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/ptypes/duration"
+ "github.com/golang/protobuf/ptypes/empty"
+
+ "github.com/luci/gae/impl/memory"
+ "github.com/luci/gae/service/taskqueue"
+
+ "github.com/luci/luci-go/common/clock"
+ "github.com/luci/luci-go/common/clock/testclock"
+ "github.com/luci/luci-go/common/retry/transient"
+ "github.com/luci/luci-go/server/router"
+
+ . "github.com/smartystreets/goconvey/convey"
+)
+
+var epoch = time.Unix(1500000000, 0).UTC()
+
+func TestDispatcher(t *testing.T) {
+ t.Parallel()
+
+ Convey("With dispatcher", t, func() {
+ ctx := memory.Use(context.Background())
+ ctx = clock.Set(ctx, testclock.New(epoch))
+ taskqueue.GetTestable(ctx).CreateQueue("another-q")
+
+ d := Dispatcher{}
+ r := router.New()
+
+ installRoutes := func() {
+ d.InstallRoutes(r, router.NewMiddlewareChain(func(c *router.Context, next router.Handler) {
+ c.Context = ctx
+ next(c)
+ }))
+ }
+
+ Convey("Single task", func() {
+ var calls []proto.Message
+ handler := func(c context.Context, payload proto.Message, execCount int) error {
+ calls = append(calls, payload)
+ return nil
+ }
+
+ // Abuse some well-known proto type to simplify the test. It's doesn't
+ // matter what proto type we use here as long as it is registered in
+ // protobuf type registry.
+ d.RegisterTask(&duration.Duration{}, handler, "", nil)
+ installRoutes()
+
+ err := d.AddTask(ctx, &Task{
+ Payload: &duration.Duration{Seconds: 123},
+ DeduplicationKey: "abc",
+ Title: "abc-def",
+ Delay: 30 * time.Second,
+ })
+ So(err, ShouldBeNil)
+
+ // Added the task.
+ expectedPath := "/internal/tasks/default/abc-def"
+ expectedName := "afc6f8271b8598ee04e359916e6c584a9bc3c520a11dd5244e3399346ac0d3a7"
+ expectedBody := []byte(`{"type":"google.protobuf.Duration","body":"123.000s"}`)
+ tasks := taskqueue.GetTestable(ctx).GetScheduledTasks()
+ So(tasks, ShouldResemble, taskqueue.QueueData{
+ "default": map[string]*taskqueue.Task{
+ expectedName: {
+ Path: expectedPath,
+ Payload: expectedBody,
+ Name: expectedName,
+ Method: "POST",
+ ETA: epoch.Add(30 * time.Second),
+ },
+ },
+ "another-q": {},
+ })
+
+ // Readd a task with same dedup key. Should be silently ignored.
+ err = d.AddTask(ctx, &Task{
+ Payload: &duration.Duration{Seconds: 123},
+ DeduplicationKey: "abc",
+ })
+ So(err, ShouldBeNil)
+
+ // No new tasks.
+ tasks = taskqueue.GetTestable(ctx).GetScheduledTasks()
+ So(len(tasks["default"]), ShouldResemble, 1)
+
+ // Execute the task.
+ req := httptest.NewRequest("POST", "http://example.com"+expectedPath, bytes.NewReader(expectedBody))
+ rw := httptest.NewRecorder()
+ r.ServeHTTP(rw, req)
+
+ // Executed.
+ So(calls, ShouldResemble, []proto.Message{
+ &duration.Duration{Seconds: 123},
+ })
+ So(rw.Code, ShouldEqual, 200)
+ })
+
+ Convey("Many tasks", func() {
+ handler := func(c context.Context, payload proto.Message, execCount int) error { return nil }
+ d.RegisterTask(&duration.Duration{}, handler, "default", nil)
+ d.RegisterTask(&empty.Empty{}, handler, "another-q", nil)
+ installRoutes()
+
+ t := []*Task{}
+ for i := 0; i < 200; i++ {
+ var task *Task
+
+ if i%2 == 0 {
+ task = &Task{
+ DeduplicationKey: fmt.Sprintf("%d", i/2),
+ Payload: &duration.Duration{},
+ Delay: time.Duration(i) * time.Second,
+ }
+ } else {
+ task = &Task{
+ DeduplicationKey: fmt.Sprintf("%d", (i-1)/2),
+ Payload: &empty.Empty{},
+ Delay: time.Duration(i) * time.Second,
+ }
+ }
+
+ t = append(t, task)
+
+ // Mix in some duplicates.
+ if i > 0 && i%100 == 0 {
+ t = append(t, task)
+ }
+ }
+ err := d.AddTask(ctx, t...)
+ So(err, ShouldBeNil)
+
+ // Added all the tasks.
+ allTasks := taskqueue.GetTestable(ctx).GetScheduledTasks()
+ delaysDefault := map[time.Duration]struct{}{}
+ for _, task := range allTasks["default"] {
+ delaysDefault[task.ETA.Sub(epoch)/time.Second] = struct{}{}
+ }
+ delaysAnotherQ := map[time.Duration]struct{}{}
+ for _, task := range allTasks["another-q"] {
+ delaysAnotherQ[task.ETA.Sub(epoch)/time.Second] = struct{}{}
+ }
+ So(len(delaysDefault), ShouldEqual, 100)
+ So(len(delaysAnotherQ), ShouldEqual, 100)
+ })
+
+ Convey("Execution errors", func() {
+ var returnErr error
+ panicNow := false
+ handler := func(c context.Context, payload proto.Message, execCount int) error {
+ if panicNow {
+ panic("must not be called")
+ }
+ return returnErr
+ }
+
+ d.RegisterTask(&duration.Duration{}, handler, "", nil)
+ installRoutes()
+
+ goodBody := `{"type":"google.protobuf.Duration","body":"123.000s"}`
+
+ execute := func(body string) *httptest.ResponseRecorder {
+ req := httptest.NewRequest(
+ "POST",
+ "http://example.com/internal/tasks/default/abc-def",
+ bytes.NewReader([]byte(body)))
+ rw := httptest.NewRecorder()
+ r.ServeHTTP(rw, req)
+ return rw
+ }
+
+ // Error conditions inside the task body.
+
+ returnErr = nil
+ rw := execute(goodBody)
+ So(rw.Code, ShouldEqual, 200)
+ So(rw.Body.String(), ShouldEqual, "OK\n")
+
+ returnErr = fmt.Errorf("fatal err")
+ rw = execute(goodBody)
+ So(rw.Code, ShouldEqual, 202) // no retry!
+ So(rw.Body.String(), ShouldEqual, "Fatal error: fatal err\n")
+
+ returnErr = transient.Tag.Apply(fmt.Errorf("transient err"))
+ rw = execute(goodBody)
+ So(rw.Code, ShouldEqual, 500) // 500 for retry
+ So(rw.Body.String(), ShouldEqual, "Transient error: transient err\n")
+
+ // Error conditions when routing the task.
+
+ panicNow = true
+
+ rw = execute("not a json")
+ So(rw.Code, ShouldEqual, 202) // no retry!
+ So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize")
+
+ rw = execute(`{"type":"google.protobuf.Duration"}`)
+ So(rw.Code, ShouldEqual, 202) // no retry!
+ So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize")
+
+ rw = execute(`{"type":"google.protobuf.Duration","body":"blah"}`)
+ So(rw.Code, ShouldEqual, 202) // no retry!
+ So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize")
+
+ rw = execute(`{"type":"unknown.proto.type","body":"{}"}`)
+ So(rw.Code, ShouldEqual, 202) // no retry!
+ So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize")
+ })
+ })
+}
« no previous file with comments | « scheduler/appengine/engine/tq/tq.go ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698