Index: appengine/logdog/coordinator/backend/archiveCron_test.go |
diff --git a/appengine/logdog/coordinator/backend/archiveCron_test.go b/appengine/logdog/coordinator/backend/archiveCron_test.go |
deleted file mode 100644 |
index 99aab32142ff24811f2991b753ae5f26c6772a7a..0000000000000000000000000000000000000000 |
--- a/appengine/logdog/coordinator/backend/archiveCron_test.go |
+++ /dev/null |
@@ -1,197 +0,0 @@ |
-// Copyright 2015 The Chromium Authors. All rights reserved. |
-// Use of this source code is governed by a BSD-style license that can be |
-// found in the LICENSE file. |
- |
-package backend |
- |
-import ( |
- "fmt" |
- "net/http" |
- "net/http/httptest" |
- "testing" |
- "time" |
- |
- "github.com/julienschmidt/httprouter" |
- ds "github.com/luci/gae/service/datastore" |
- "github.com/luci/luci-go/appengine/gaetesting" |
- "github.com/luci/luci-go/appengine/logdog/coordinator" |
- ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest" |
- "github.com/luci/luci-go/common/clock" |
- "github.com/luci/luci-go/common/clock/testclock" |
- "github.com/luci/luci-go/common/errors" |
- "github.com/luci/luci-go/common/proto/google" |
- |
- . "github.com/smartystreets/goconvey/convey" |
-) |
- |
-func TestHandleArchiveCron(t *testing.T) { |
- t.Parallel() |
- |
- Convey(`A testing environment`, t, func() { |
- c := gaetesting.TestingContext() |
- c, _ = testclock.UseTime(c, testclock.TestTimeUTC) |
- |
- // Add the archival index from "index.yaml". |
- ds.Get(c).Testable().AddIndexes( |
- &ds.IndexDefinition{ |
- Kind: "LogStream", |
- SortBy: []ds.IndexColumn{ |
- {Property: "State"}, |
- {Property: "Created", Descending: true}, |
- }, |
- }, |
- ) |
- |
- // Install a base set of streams. |
- // |
- // If the stream was created >= 10min ago, it will be candidate for |
- // archival. |
- now := clock.Now(c) |
- for _, v := range []struct { |
- name string |
- d time.Duration // Time before "now" for this streams Created. |
- state coordinator.LogStreamState |
- }{ |
- {"foo", 0, coordinator.LSStreaming}, // Not candidate for archival (too soon). |
- {"bar", 10 * time.Minute, coordinator.LSStreaming}, // Candidate for archival. |
- {"baz", 10 * time.Minute, coordinator.LSArchiveTasked}, // Not candidate for archival (archive tasked). |
- {"qux", 10 * time.Hour, coordinator.LSArchived}, // Not candidate for archival (archived). |
- {"quux", 10 * time.Hour, coordinator.LSStreaming}, // Candidate for archival. |
- } { |
- ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, v.name)) |
- |
- // The entry was created a week ago. |
- ls.Created = now.Add(-v.d) |
- ls.State = v.state |
- ls.TerminatedTime = now // Doesn't matter, but required to Put the stream. |
- ls.ArchivedTime = now // Doesn't matter, but required to Put the stream. |
- if err := ds.Get(c).Put(ls); err != nil { |
- panic(err) |
- } |
- } |
- ds.Get(c).Testable().CatchupIndexes() |
- |
- // This allows us to update our Context in test setup. |
- tap := ct.ArchivalPublisher{} |
- svcStub := ct.Services{ |
- AP: func() (coordinator.ArchivalPublisher, error) { |
- return &tap, nil |
- }, |
- } |
- c = coordinator.WithServices(c, &svcStub) |
- |
- b := Backend{} |
- |
- tb := testBase{Context: c} |
- r := httprouter.New() |
- b.InstallHandlers(r, tb.base) |
- |
- s := httptest.NewServer(r) |
- defer s.Close() |
- |
- endpoint := fmt.Sprintf("%s/archive/cron", s.URL) |
- |
- Convey(`With no configuration loaded, the endpoint will fail.`, func() { |
- resp, err := http.Get(endpoint) |
- So(err, ShouldBeNil) |
- So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError) |
- }) |
- |
- Convey(`With no task topic configured, the endpoint will fail.`, func() { |
- svcStub.InitConfig() |
- |
- resp, err := http.Get(endpoint) |
- So(err, ShouldBeNil) |
- So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError) |
- }) |
- |
- Convey(`With a configuration loaded`, func() { |
- svcStub.InitConfig() |
- svcStub.ServiceConfig.Coordinator.ArchiveTopic = "projects/test/topics/archive-test-topic" |
- svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay = google.NewDuration(10 * time.Second) |
- svcStub.ServiceConfig.Coordinator.ArchiveDelayMax = google.NewDuration(10 * time.Minute) |
- |
- Convey(`A request to the endpoint will be successful.`, func() { |
- resp, err := http.Get(endpoint) |
- So(err, ShouldBeNil) |
- So(resp.StatusCode, ShouldEqual, http.StatusOK) |
- |
- // Candidate tasks should be scheduled. |
- So(tap.StreamNames(), ShouldResemble, []string{"bar", "quux"}) |
- |
- // Hit the endpoint again, no additional tasks should be scheduled. |
- Convey(`A subsequent endpoint hit will not schedule any additional tasks.`, func() { |
- resp, err = http.Get(endpoint) |
- So(err, ShouldBeNil) |
- So(resp.StatusCode, ShouldEqual, http.StatusOK) |
- So(tap.StreamNames(), ShouldResemble, []string{"bar", "quux"}) |
- }) |
- }) |
- |
- Convey(`When scheduling multiple tasks`, func() { |
- b.multiTaskBatchSize = 5 |
- |
- // Create a lot of archival candidate tasks to schedule. |
- // |
- // Note that since task queue names are returned sorted, we want to |
- // name these streams greater than our stock stream names. |
- names := []string{"bar", "quux"} |
- for i := 0; i < 11; i++ { |
- ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, fmt.Sprintf("stream-%02d", i))) |
- |
- ls.Created = now.Add(-(10 * time.Minute)) |
- ls.State = coordinator.LSStreaming |
- ls.TerminalIndex = 1337 |
- ls.TerminatedTime = now |
- if err := ds.Get(c).Put(ls); err != nil { |
- panic(err) |
- } |
- |
- names = append(names, ls.Name) |
- } |
- ds.Get(c).Testable().CatchupIndexes() |
- |
- Convey(`Will schedule all pages properly.`, func() { |
- // Ensure that all of these tasks get added to the task queue. |
- resp, err := http.Get(endpoint) |
- So(err, ShouldBeNil) |
- So(resp.StatusCode, ShouldEqual, http.StatusOK) |
- So(tap.StreamNames(), ShouldResemble, names) |
- |
- // Use this opportunity to assert that none of the scheduled streams |
- // have any settle or completion delay. |
- for _, at := range tap.Tasks() { |
- So(at.SettleDelay.Duration(), ShouldEqual, 0) |
- So(at.CompletePeriod.Duration(), ShouldEqual, 0) |
- } |
- |
- Convey(`Will not schedule additional tasks on the next run.`, func() { |
- resp, err := http.Get(endpoint) |
- So(err, ShouldBeNil) |
- So(resp.StatusCode, ShouldEqual, http.StatusOK) |
- So(tap.StreamNames(), ShouldResemble, names) |
- }) |
- }) |
- |
- Convey(`Will not schedule tasks if task scheduling fails.`, func() { |
- tap.Err = errors.New("test error") |
- |
- // Ensure that all of these tasks get added to the task queue. |
- resp, err := http.Get(endpoint) |
- So(err, ShouldBeNil) |
- So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError) |
- So(tap.StreamNames(), ShouldResemble, []string{}) |
- |
- Convey(`And will schedule streams next run.`, func() { |
- tap.Err = nil |
- |
- resp, err := http.Get(endpoint) |
- So(err, ShouldBeNil) |
- So(resp.StatusCode, ShouldEqual, http.StatusOK) |
- So(tap.StreamNames(), ShouldResemble, names) |
- }) |
- }) |
- }) |
- }) |
- }) |
-} |