Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1186)

Unified Diff: appengine/logdog/coordinator/backend/archiveCron_test.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Minor fixes, works in dev now. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: appengine/logdog/coordinator/backend/archiveCron_test.go
diff --git a/appengine/logdog/coordinator/backend/archiveCron_test.go b/appengine/logdog/coordinator/backend/archiveCron_test.go
index 88e45352d3a2a30ec23ec3ebc58597edec2d8aa8..b292b9ea26b5651793231a99a79f44339cdf4c5c 100644
--- a/appengine/logdog/coordinator/backend/archiveCron_test.go
+++ b/appengine/logdog/coordinator/backend/archiveCron_test.go
@@ -40,40 +40,42 @@ func TestHandleArchiveCron(t *testing.T) {
Kind: "LogStream",
SortBy: []ds.IndexColumn{
{Property: "State"},
- {Property: "Updated"},
+ {Property: "Created", Descending: true},
},
},
)
// Install a base set of streams.
+ //
+ // If the stream was created >= 10min ago, it will be candidate for
+ // archival.
+ now := clock.Now(c)
for _, v := range []struct {
- name string
- d time.Duration
- term bool
+ name string
+ d time.Duration // Time before "now" for this streams Created.
+ state coordinator.LogStreamState
}{
- {"foo", 0, true}, // Not candidate for archival (too soon).
- {"bar", 10 * time.Minute, true}, // Candidate for archival.
- {"baz", 10 * time.Minute, false}, // Not candidate for archival (not terminal).
- {"qux", 24 * time.Hour, false}, // Candidate for non-terminal archival.
+ {"foo", 0, coordinator.LSStreaming}, // Not candidate for archival (too soon).
+ {"bar", 10 * time.Minute, coordinator.LSStreaming}, // Candidate for archival.
+ {"baz", 10 * time.Minute, coordinator.LSArchiveTasked}, // Not candidate for archival (archive tasked).
+ {"qux", 10 * time.Hour, coordinator.LSArchived}, // Not candidate for archival (archived).
+ {"quux", 10 * time.Hour, coordinator.LSStreaming}, // Candidate for archival.
} {
ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, v.name))
// The entry was created a week ago.
- ls.Created = clock.Now(c).Add(-(7 * 24 * time.Hour))
- ls.Updated = clock.Now(c).Add(-v.d)
- if v.term {
- ls.State = coordinator.LSTerminated
- }
- if err := ls.Put(ds.Get(c)); err != nil {
+ ls.Created = now.Add(-v.d)
+ ls.State = v.state
+ ls.TerminatedTime = now // Doesn't matter, but required to Put the stream.
+ ls.ArchivedTime = now // Doesn't matter, but required to Put the stream.
+ if err := ds.Get(c).Put(ls); err != nil {
panic(err)
}
}
ds.Get(c).Testable().CatchupIndexes()
// This allows us to update our Context in test setup.
- b := Backend{
- multiTaskBatchSize: 5,
- }
+ b := Backend{}
tb := testBase{Context: c}
r := httprouter.New()
@@ -82,156 +84,118 @@ func TestHandleArchiveCron(t *testing.T) {
s := httptest.NewServer(r)
defer s.Close()
- Convey(`With no configuration loaded`, func() {
- for _, ep := range []string{"terminal", "nonterminal"} {
- Convey(fmt.Sprintf(`A %q endpoint hit will fail.`, ep), func() {
- resp, err := http.Get(fmt.Sprintf("%s/archive/cron/%s", s.URL, ep))
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
- })
- }
+ endpoint := fmt.Sprintf("%s/archive/cron", s.URL)
+
+ Convey(`With no configuration loaded, the endpoint will fail.`, func() {
+ resp, err := http.Get(endpoint)
+ So(err, ShouldBeNil)
+ So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
})
Convey(`With a configuration loaded`, func() {
qName := "archive-test-queue"
c = ct.UseConfig(c, &svcconfig.Coordinator{
- ArchiveDelay: google.NewDuration(10 * time.Minute),
- ArchiveDelayMax: google.NewDuration(24 * time.Hour),
ArchiveTaskQueue: qName,
+ ArchiveDelayMax: google.NewDuration(10 * time.Minute),
})
tb.Context = c
- Convey(`With no task queue configured`, func() {
- for _, ep := range []string{"terminal", "nonterminal", "purge"} {
- Convey(fmt.Sprintf(`A %q endpoint hit will fail.`, ep), func() {
- resp, err := http.Get(fmt.Sprintf("%s/archive/cron/%s", s.URL, ep))
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
- })
- }
+ Convey(`With no task queue configured, the endpoint will fail.`, func() {
+ resp, err := http.Get(endpoint)
+ So(err, ShouldBeNil)
+ So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
})
Convey(`With a task queue configured`, func() {
tq.Get(c).Testable().CreateQueue(qName)
- Convey(`A terminal endpoint hit will be successful and idempotent.`, func() {
- resp, err := http.Get(fmt.Sprintf("%s/archive/cron/terminal", s.URL))
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
-
- // Candidate tasks should be scheduled.
- tasks := tq.Get(c).Testable().GetScheduledTasks()[qName]
- So(tasks, shouldHaveTasks, archiveTaskName("testing/+/bar"))
-
- // Hit the endpoint again, the same tasks should be scheduled.
- resp, err = http.Get(fmt.Sprintf("%s/archive/cron/terminal", s.URL))
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
-
- tasks2 := tq.Get(c).Testable().GetScheduledTasks()[qName]
- So(tasks2, ShouldResemble, tasks)
- })
+ streams := func() []string {
+ s, err := ct.GetArchiveTaskStreams(tq.Get(c), qName)
+ if err != nil {
+ panic(err)
+ }
+ return s
+ }
- Convey(`A non-terminal endpoint hit will be successful and idempotent.`, func() {
- resp, err := http.Get(fmt.Sprintf("%s/archive/cron/nonterminal", s.URL))
+ Convey(`A request to the endpoint will be successful.`, func() {
+ resp, err := http.Get(endpoint)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
// Candidate tasks should be scheduled.
- tasks := tq.Get(c).Testable().GetScheduledTasks()[qName]
- So(tasks, shouldHaveTasks, archiveTaskName("testing/+/qux"))
+ So(streams(), ShouldResemble, []string{"bar", "quux"})
- // Hit the endpoint again, the same tasks should be scheduled.
- resp, err = http.Get(fmt.Sprintf("%s/archive/cron/nonterminal", s.URL))
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
-
- tasks2 := tq.Get(c).Testable().GetScheduledTasks()[qName]
- So(tasks2, ShouldResemble, tasks)
- })
-
- Convey(`A terminal endpoint hit followed by a non-terminal endpoint hit will be successful.`, func() {
- resp, err := http.Get(fmt.Sprintf("%s/archive/cron/terminal", s.URL))
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- So(tq.Get(c).Testable().GetScheduledTasks()[qName], shouldHaveTasks, archiveTaskName("testing/+/bar"))
-
- resp, err = http.Get(fmt.Sprintf("%s/archive/cron/nonterminal", s.URL))
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- So(tq.Get(c).Testable().GetScheduledTasks()[qName], shouldHaveTasks,
- archiveTaskName("testing/+/bar"), archiveTaskName("testing/+/qux"))
+ // Hit the endpoint again, no additional tasks should be scheduled.
+ Convey(`A subsequent endpoint hit will not schedule any additional tasks.`, func() {
+ resp, err = http.Get(endpoint)
+ So(err, ShouldBeNil)
+ So(resp.StatusCode, ShouldEqual, http.StatusOK)
+ So(streams(), ShouldResemble, []string{"bar", "quux"})
+ })
})
Convey(`When scheduling multiple tasks`, func() {
+ b.multiTaskBatchSize = 5
+
// Create a lot of archival candidate tasks to schedule.
- var names []string
+ //
+ // Note that since task queue names are returned sorted, we want to
+ // name these streams greater than our stock stream names.
+ names := []string{"bar", "quux"}
for i := 0; i < 11; i++ {
- ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, fmt.Sprintf("stream-%d", i)))
+ ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, fmt.Sprintf("stream-%02d", i)))
- ls.Created = clock.Now(c).Add(-(10 * time.Minute))
- ls.Updated = ls.Created
- ls.State = coordinator.LSTerminated
+ ls.Created = now.Add(-(10 * time.Minute))
+ ls.State = coordinator.LSStreaming
ls.TerminalIndex = 1337
- if err := ls.Put(ds.Get(c)); err != nil {
+ ls.TerminatedTime = now
+ if err := ds.Get(c).Put(ls); err != nil {
panic(err)
}
names = append(names, ls.Name)
}
- names = append(names, "bar")
ds.Get(c).Testable().CatchupIndexes()
Convey(`Will schedule all pages properly.`, func() {
- taskNames := make([]interface{}, len(names))
- for i, n := range names {
- taskNames[i] = archiveTaskName(fmt.Sprintf("testing/+/%s", n))
- }
-
// Ensure that all of these tasks get added to the task queue.
- resp, err := http.Get(fmt.Sprintf("%s/archive/cron/terminal", s.URL))
+ resp, err := http.Get(endpoint)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
- So(tq.Get(c).Testable().GetScheduledTasks()[qName], shouldHaveTasks, taskNames...)
+ So(streams(), ShouldResemble, names)
- Convey(`Will be successful when rescheduling the same tasks.`, func() {
- resp, err := http.Get(fmt.Sprintf("%s/archive/cron/terminal", s.URL))
+ // Verify that all tasks are configured to execute immediately.
+ for _, t := range tq.Get(c).Testable().GetScheduledTasks()[qName] {
+ So(t.ETA, ShouldResemble, now)
+ }
+
+ Convey(`Will not schedule additional tasks on the next run.`, func() {
+ resp, err := http.Get(endpoint)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
- So(tq.Get(c).Testable().GetScheduledTasks()[qName], shouldHaveTasks, taskNames...)
+ So(streams(), ShouldResemble, names)
})
})
- Convey(`Will return an error if task scheduling fails.`, func() {
+ Convey(`Will not schedule tasks if task scheduling fails.`, func() {
c, fb := featureBreaker.FilterTQ(c, nil)
tb.Context = c
fb.BreakFeatures(errors.New("test error"), "AddMulti")
// Ensure that all of these tasks get added to the task queue.
- resp, err := http.Get(fmt.Sprintf("%s/archive/cron/terminal", s.URL))
+ resp, err := http.Get(endpoint)
So(err, ShouldBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
- })
+ So(streams(), ShouldResemble, []string{})
- Convey(`Will return an error if a single task scheduling fails.`, func() {
- merr := make(errors.MultiError, len(names))
- merr[0] = errors.New("test error")
+ Convey(`And will schedule streams next run.`, func() {
+ fb.UnbreakFeatures("AddMulti")
- c, fb := featureBreaker.FilterTQ(c, nil)
- tb.Context = c
- fb.BreakFeatures(merr, "AddMulti")
-
- // Ensure that all of these tasks get added to the task queue.
- resp, err := http.Get(fmt.Sprintf("%s/archive/cron/terminal", s.URL))
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
- })
-
- Convey(`And a purge endpoint hit will purge the tasks.`, func() {
- resp, err := http.Get(fmt.Sprintf("%s/archive/cron/purge", s.URL))
- So(err, ShouldBeNil)
- So(resp.StatusCode, ShouldEqual, http.StatusOK)
- So(tq.Get(c).Testable().GetScheduledTasks()[qName], shouldHaveTasks)
+ resp, err := http.Get(endpoint)
+ So(err, ShouldBeNil)
+ So(resp.StatusCode, ShouldEqual, http.StatusOK)
+ So(streams(), ShouldResemble, names)
+ })
})
})
})

Powered by Google App Engine
This is Rietveld 408576698