Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(95)

Side by Side Diff: appengine/logdog/coordinator/backend/archiveCron_test.go

Issue 1910633006: LogDog: Support per-namespace expired archival. (Closed) Base URL: https://github.com/luci/luci-go@logdog-coordinator-svcdec
Patch Set: Update another test. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 package backend
6
7 import (
8 "fmt"
9 "net/http"
10 "net/http/httptest"
11 "testing"
12 "time"
13
14 "github.com/julienschmidt/httprouter"
15 ds "github.com/luci/gae/service/datastore"
16 "github.com/luci/luci-go/appengine/gaetesting"
17 "github.com/luci/luci-go/appengine/logdog/coordinator"
18 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest "
19 "github.com/luci/luci-go/common/clock"
20 "github.com/luci/luci-go/common/clock/testclock"
21 "github.com/luci/luci-go/common/errors"
22 "github.com/luci/luci-go/common/proto/google"
23
24 . "github.com/smartystreets/goconvey/convey"
25 )
26
27 func TestHandleArchiveCron(t *testing.T) {
28 t.Parallel()
29
30 Convey(`A testing environment`, t, func() {
31 c := gaetesting.TestingContext()
32 c, _ = testclock.UseTime(c, testclock.TestTimeUTC)
33
34 // Add the archival index from "index.yaml".
35 ds.Get(c).Testable().AddIndexes(
36 &ds.IndexDefinition{
37 Kind: "LogStream",
38 SortBy: []ds.IndexColumn{
39 {Property: "State"},
40 {Property: "Created", Descending: true},
41 },
42 },
43 )
44
45 // Install a base set of streams.
46 //
47 // If the stream was created >= 10min ago, it will be candidate for
48 // archival.
49 now := clock.Now(c)
50 for _, v := range []struct {
51 name string
52 d time.Duration // Time before "now" for this stream s Created.
53 state coordinator.LogStreamState
54 }{
55 {"foo", 0, coordinator.LSStreaming}, // Not candidate for archival (too soon).
56 {"bar", 10 * time.Minute, coordinator.LSStreaming}, // Candidate for archival.
57 {"baz", 10 * time.Minute, coordinator.LSArchiveTasked}, // Not candidate for archival (archive tasked).
58 {"qux", 10 * time.Hour, coordinator.LSArchived}, // Not candidate for archival (archived).
59 {"quux", 10 * time.Hour, coordinator.LSStreaming}, // Candidate for archival.
60 } {
61 ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, v.name))
62
63 // The entry was created a week ago.
64 ls.Created = now.Add(-v.d)
65 ls.State = v.state
66 ls.TerminatedTime = now // Doesn't matter, but required to Put the stream.
67 ls.ArchivedTime = now // Doesn't matter, but required to Put the stream.
68 if err := ds.Get(c).Put(ls); err != nil {
69 panic(err)
70 }
71 }
72 ds.Get(c).Testable().CatchupIndexes()
73
74 // This allows us to update our Context in test setup.
75 tap := ct.ArchivalPublisher{}
76 svcStub := ct.Services{
77 AP: func() (coordinator.ArchivalPublisher, error) {
78 return &tap, nil
79 },
80 }
81 c = coordinator.WithServices(c, &svcStub)
82
83 b := Backend{}
84
85 tb := testBase{Context: c}
86 r := httprouter.New()
87 b.InstallHandlers(r, tb.base)
88
89 s := httptest.NewServer(r)
90 defer s.Close()
91
92 endpoint := fmt.Sprintf("%s/archive/cron", s.URL)
93
94 Convey(`With no configuration loaded, the endpoint will fail.`, func() {
95 resp, err := http.Get(endpoint)
96 So(err, ShouldBeNil)
97 So(resp.StatusCode, ShouldEqual, http.StatusInternalServ erError)
98 })
99
100 Convey(`With no task topic configured, the endpoint will fail.`, func() {
101 svcStub.InitConfig()
102
103 resp, err := http.Get(endpoint)
104 So(err, ShouldBeNil)
105 So(resp.StatusCode, ShouldEqual, http.StatusInternalServ erError)
106 })
107
108 Convey(`With a configuration loaded`, func() {
109 svcStub.InitConfig()
110 svcStub.ServiceConfig.Coordinator.ArchiveTopic = "projec ts/test/topics/archive-test-topic"
111 svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay = g oogle.NewDuration(10 * time.Second)
112 svcStub.ServiceConfig.Coordinator.ArchiveDelayMax = goog le.NewDuration(10 * time.Minute)
113
114 Convey(`A request to the endpoint will be successful.`, func() {
115 resp, err := http.Get(endpoint)
116 So(err, ShouldBeNil)
117 So(resp.StatusCode, ShouldEqual, http.StatusOK)
118
119 // Candidate tasks should be scheduled.
120 So(tap.StreamNames(), ShouldResemble, []string{" bar", "quux"})
121
122 // Hit the endpoint again, no additional tasks s hould be scheduled.
123 Convey(`A subsequent endpoint hit will not sched ule any additional tasks.`, func() {
124 resp, err = http.Get(endpoint)
125 So(err, ShouldBeNil)
126 So(resp.StatusCode, ShouldEqual, http.St atusOK)
127 So(tap.StreamNames(), ShouldResemble, [] string{"bar", "quux"})
128 })
129 })
130
131 Convey(`When scheduling multiple tasks`, func() {
132 b.multiTaskBatchSize = 5
133
134 // Create a lot of archival candidate tasks to s chedule.
135 //
136 // Note that since task queue names are returned sorted, we want to
137 // name these streams greater than our stock str eam names.
138 names := []string{"bar", "quux"}
139 for i := 0; i < 11; i++ {
140 ls := ct.TestLogStream(c, ct.TestLogStre amDescriptor(c, fmt.Sprintf("stream-%02d", i)))
141
142 ls.Created = now.Add(-(10 * time.Minute) )
143 ls.State = coordinator.LSStreaming
144 ls.TerminalIndex = 1337
145 ls.TerminatedTime = now
146 if err := ds.Get(c).Put(ls); err != nil {
147 panic(err)
148 }
149
150 names = append(names, ls.Name)
151 }
152 ds.Get(c).Testable().CatchupIndexes()
153
154 Convey(`Will schedule all pages properly.`, func () {
155 // Ensure that all of these tasks get ad ded to the task queue.
156 resp, err := http.Get(endpoint)
157 So(err, ShouldBeNil)
158 So(resp.StatusCode, ShouldEqual, http.St atusOK)
159 So(tap.StreamNames(), ShouldResemble, na mes)
160
161 // Use this opportunity to assert that n one of the scheduled streams
162 // have any settle or completion delay.
163 for _, at := range tap.Tasks() {
164 So(at.SettleDelay.Duration(), Sh ouldEqual, 0)
165 So(at.CompletePeriod.Duration(), ShouldEqual, 0)
166 }
167
168 Convey(`Will not schedule additional tas ks on the next run.`, func() {
169 resp, err := http.Get(endpoint)
170 So(err, ShouldBeNil)
171 So(resp.StatusCode, ShouldEqual, http.StatusOK)
172 So(tap.StreamNames(), ShouldRese mble, names)
173 })
174 })
175
176 Convey(`Will not schedule tasks if task scheduli ng fails.`, func() {
177 tap.Err = errors.New("test error")
178
179 // Ensure that all of these tasks get ad ded to the task queue.
180 resp, err := http.Get(endpoint)
181 So(err, ShouldBeNil)
182 So(resp.StatusCode, ShouldEqual, http.St atusInternalServerError)
183 So(tap.StreamNames(), ShouldResemble, [] string{})
184
185 Convey(`And will schedule streams next r un.`, func() {
186 tap.Err = nil
187
188 resp, err := http.Get(endpoint)
189 So(err, ShouldBeNil)
190 So(resp.StatusCode, ShouldEqual, http.StatusOK)
191 So(tap.StreamNames(), ShouldRese mble, names)
192 })
193 })
194 })
195 })
196 })
197 }
OLDNEW
« no previous file with comments | « appengine/logdog/coordinator/backend/archiveCron.go ('k') | appengine/logdog/coordinator/backend/backend.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698