| Index: scheduler/appengine/engine/tq/tq_test.go
|
| diff --git a/scheduler/appengine/engine/tq/tq_test.go b/scheduler/appengine/engine/tq/tq_test.go
|
| deleted file mode 100644
|
| index e3176d07a3c001f861b6c414fd49aff5f7115153..0000000000000000000000000000000000000000
|
| --- a/scheduler/appengine/engine/tq/tq_test.go
|
| +++ /dev/null
|
| @@ -1,234 +0,0 @@
|
| -// Copyright 2017 The LUCI Authors.
|
| -//
|
| -// Licensed under the Apache License, Version 2.0 (the "License");
|
| -// you may not use this file except in compliance with the License.
|
| -// You may obtain a copy of the License at
|
| -//
|
| -// http://www.apache.org/licenses/LICENSE-2.0
|
| -//
|
| -// Unless required by applicable law or agreed to in writing, software
|
| -// distributed under the License is distributed on an "AS IS" BASIS,
|
| -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| -// See the License for the specific language governing permissions and
|
| -// limitations under the License.
|
| -
|
| -package tq
|
| -
|
| -import (
|
| - "bytes"
|
| - "fmt"
|
| - "net/http/httptest"
|
| - "testing"
|
| - "time"
|
| -
|
| - "golang.org/x/net/context"
|
| -
|
| - "github.com/golang/protobuf/proto"
|
| - "github.com/golang/protobuf/ptypes/duration"
|
| - "github.com/golang/protobuf/ptypes/empty"
|
| -
|
| - "github.com/luci/gae/impl/memory"
|
| - "github.com/luci/gae/service/taskqueue"
|
| -
|
| - "github.com/luci/luci-go/common/clock"
|
| - "github.com/luci/luci-go/common/clock/testclock"
|
| - "github.com/luci/luci-go/common/retry/transient"
|
| - "github.com/luci/luci-go/server/router"
|
| -
|
| - . "github.com/smartystreets/goconvey/convey"
|
| -)
|
| -
|
| -var epoch = time.Unix(1500000000, 0).UTC()
|
| -
|
| -func TestDispatcher(t *testing.T) {
|
| - t.Parallel()
|
| -
|
| - Convey("With dispatcher", t, func() {
|
| - ctx := memory.Use(context.Background())
|
| - ctx = clock.Set(ctx, testclock.New(epoch))
|
| - taskqueue.GetTestable(ctx).CreateQueue("another-q")
|
| -
|
| - d := Dispatcher{}
|
| - r := router.New()
|
| -
|
| - installRoutes := func() {
|
| - d.InstallRoutes(r, router.NewMiddlewareChain(func(c *router.Context, next router.Handler) {
|
| - c.Context = ctx
|
| - next(c)
|
| - }))
|
| - }
|
| -
|
| - Convey("Single task", func() {
|
| - var calls []proto.Message
|
| - handler := func(c context.Context, payload proto.Message, execCount int) error {
|
| - calls = append(calls, payload)
|
| - return nil
|
| - }
|
| -
|
| - // Abuse some well-known proto type to simplify the test. It's doesn't
|
| - // matter what proto type we use here as long as it is registered in
|
| - // protobuf type registry.
|
| - d.RegisterTask(&duration.Duration{}, handler, "", nil)
|
| - installRoutes()
|
| -
|
| - err := d.AddTask(ctx, &Task{
|
| - Payload: &duration.Duration{Seconds: 123},
|
| - DeduplicationKey: "abc",
|
| - Title: "abc-def",
|
| - Delay: 30 * time.Second,
|
| - })
|
| - So(err, ShouldBeNil)
|
| -
|
| - // Added the task.
|
| - expectedPath := "/internal/tasks/default/abc-def"
|
| - expectedName := "afc6f8271b8598ee04e359916e6c584a9bc3c520a11dd5244e3399346ac0d3a7"
|
| - expectedBody := []byte(`{"type":"google.protobuf.Duration","body":"123.000s"}`)
|
| - tasks := taskqueue.GetTestable(ctx).GetScheduledTasks()
|
| - So(tasks, ShouldResemble, taskqueue.QueueData{
|
| - "default": map[string]*taskqueue.Task{
|
| - expectedName: {
|
| - Path: expectedPath,
|
| - Payload: expectedBody,
|
| - Name: expectedName,
|
| - Method: "POST",
|
| - ETA: epoch.Add(30 * time.Second),
|
| - },
|
| - },
|
| - "another-q": {},
|
| - })
|
| -
|
| - // Readd a task with same dedup key. Should be silently ignored.
|
| - err = d.AddTask(ctx, &Task{
|
| - Payload: &duration.Duration{Seconds: 123},
|
| - DeduplicationKey: "abc",
|
| - })
|
| - So(err, ShouldBeNil)
|
| -
|
| - // No new tasks.
|
| - tasks = taskqueue.GetTestable(ctx).GetScheduledTasks()
|
| - So(len(tasks["default"]), ShouldResemble, 1)
|
| -
|
| - // Execute the task.
|
| - req := httptest.NewRequest("POST", "http://example.com"+expectedPath, bytes.NewReader(expectedBody))
|
| - rw := httptest.NewRecorder()
|
| - r.ServeHTTP(rw, req)
|
| -
|
| - // Executed.
|
| - So(calls, ShouldResemble, []proto.Message{
|
| - &duration.Duration{Seconds: 123},
|
| - })
|
| - So(rw.Code, ShouldEqual, 200)
|
| - })
|
| -
|
| - Convey("Many tasks", func() {
|
| - handler := func(c context.Context, payload proto.Message, execCount int) error { return nil }
|
| - d.RegisterTask(&duration.Duration{}, handler, "default", nil)
|
| - d.RegisterTask(&empty.Empty{}, handler, "another-q", nil)
|
| - installRoutes()
|
| -
|
| - t := []*Task{}
|
| - for i := 0; i < 200; i++ {
|
| - var task *Task
|
| -
|
| - if i%2 == 0 {
|
| - task = &Task{
|
| - DeduplicationKey: fmt.Sprintf("%d", i/2),
|
| - Payload: &duration.Duration{},
|
| - Delay: time.Duration(i) * time.Second,
|
| - }
|
| - } else {
|
| - task = &Task{
|
| - DeduplicationKey: fmt.Sprintf("%d", (i-1)/2),
|
| - Payload: &empty.Empty{},
|
| - Delay: time.Duration(i) * time.Second,
|
| - }
|
| - }
|
| -
|
| - t = append(t, task)
|
| -
|
| - // Mix in some duplicates.
|
| - if i > 0 && i%100 == 0 {
|
| - t = append(t, task)
|
| - }
|
| - }
|
| - err := d.AddTask(ctx, t...)
|
| - So(err, ShouldBeNil)
|
| -
|
| - // Added all the tasks.
|
| - allTasks := taskqueue.GetTestable(ctx).GetScheduledTasks()
|
| - delaysDefault := map[time.Duration]struct{}{}
|
| - for _, task := range allTasks["default"] {
|
| - delaysDefault[task.ETA.Sub(epoch)/time.Second] = struct{}{}
|
| - }
|
| - delaysAnotherQ := map[time.Duration]struct{}{}
|
| - for _, task := range allTasks["another-q"] {
|
| - delaysAnotherQ[task.ETA.Sub(epoch)/time.Second] = struct{}{}
|
| - }
|
| - So(len(delaysDefault), ShouldEqual, 100)
|
| - So(len(delaysAnotherQ), ShouldEqual, 100)
|
| - })
|
| -
|
| - Convey("Execution errors", func() {
|
| - var returnErr error
|
| - panicNow := false
|
| - handler := func(c context.Context, payload proto.Message, execCount int) error {
|
| - if panicNow {
|
| - panic("must not be called")
|
| - }
|
| - return returnErr
|
| - }
|
| -
|
| - d.RegisterTask(&duration.Duration{}, handler, "", nil)
|
| - installRoutes()
|
| -
|
| - goodBody := `{"type":"google.protobuf.Duration","body":"123.000s"}`
|
| -
|
| - execute := func(body string) *httptest.ResponseRecorder {
|
| - req := httptest.NewRequest(
|
| - "POST",
|
| - "http://example.com/internal/tasks/default/abc-def",
|
| - bytes.NewReader([]byte(body)))
|
| - rw := httptest.NewRecorder()
|
| - r.ServeHTTP(rw, req)
|
| - return rw
|
| - }
|
| -
|
| - // Error conditions inside the task body.
|
| -
|
| - returnErr = nil
|
| - rw := execute(goodBody)
|
| - So(rw.Code, ShouldEqual, 200)
|
| - So(rw.Body.String(), ShouldEqual, "OK\n")
|
| -
|
| - returnErr = fmt.Errorf("fatal err")
|
| - rw = execute(goodBody)
|
| - So(rw.Code, ShouldEqual, 202) // no retry!
|
| - So(rw.Body.String(), ShouldEqual, "Fatal error: fatal err\n")
|
| -
|
| - returnErr = transient.Tag.Apply(fmt.Errorf("transient err"))
|
| - rw = execute(goodBody)
|
| - So(rw.Code, ShouldEqual, 500) // 500 for retry
|
| - So(rw.Body.String(), ShouldEqual, "Transient error: transient err\n")
|
| -
|
| - // Error conditions when routing the task.
|
| -
|
| - panicNow = true
|
| -
|
| - rw = execute("not a json")
|
| - So(rw.Code, ShouldEqual, 202) // no retry!
|
| - So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize")
|
| -
|
| - rw = execute(`{"type":"google.protobuf.Duration"}`)
|
| - So(rw.Code, ShouldEqual, 202) // no retry!
|
| - So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize")
|
| -
|
| - rw = execute(`{"type":"google.protobuf.Duration","body":"blah"}`)
|
| - So(rw.Code, ShouldEqual, 202) // no retry!
|
| - So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize")
|
| -
|
| - rw = execute(`{"type":"unknown.proto.type","body":"{}"}`)
|
| - So(rw.Code, ShouldEqual, 202) // no retry!
|
| - So(rw.Body.String(), ShouldStartWith, "Bad payload, can't deserialize")
|
| - })
|
| - })
|
| -}
|
|
|