| OLD | NEW |
| (Empty) |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | |
| 2 // Use of this source code is governed by a BSD-style license that can be | |
| 3 // found in the LICENSE file. | |
| 4 | |
| 5 package backend | |
| 6 | |
| 7 import ( | |
| 8 "fmt" | |
| 9 "net/http" | |
| 10 "net/http/httptest" | |
| 11 "testing" | |
| 12 "time" | |
| 13 | |
| 14 "github.com/julienschmidt/httprouter" | |
| 15 ds "github.com/luci/gae/service/datastore" | |
| 16 "github.com/luci/luci-go/appengine/gaetesting" | |
| 17 "github.com/luci/luci-go/appengine/logdog/coordinator" | |
| 18 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" | |
| 19 "github.com/luci/luci-go/common/clock" | |
| 20 "github.com/luci/luci-go/common/clock/testclock" | |
| 21 "github.com/luci/luci-go/common/errors" | |
| 22 "github.com/luci/luci-go/common/proto/google" | |
| 23 | |
| 24 . "github.com/smartystreets/goconvey/convey" | |
| 25 ) | |
| 26 | |
| 27 func TestHandleArchiveCron(t *testing.T) { | |
| 28 t.Parallel() | |
| 29 | |
| 30 Convey(`A testing environment`, t, func() { | |
| 31 c := gaetesting.TestingContext() | |
| 32 c, _ = testclock.UseTime(c, testclock.TestTimeUTC) | |
| 33 | |
| 34 // Add the archival index from "index.yaml". | |
| 35 ds.Get(c).Testable().AddIndexes( | |
| 36 &ds.IndexDefinition{ | |
| 37 Kind: "LogStream", | |
| 38 SortBy: []ds.IndexColumn{ | |
| 39 {Property: "State"}, | |
| 40 {Property: "Created", Descending: true}, | |
| 41 }, | |
| 42 }, | |
| 43 ) | |
| 44 | |
| 45 // Install a base set of streams. | |
| 46 // | |
| 47 // If the stream was created >= 10min ago, it will be candidate
for | |
| 48 // archival. | |
| 49 now := clock.Now(c) | |
| 50 for _, v := range []struct { | |
| 51 name string | |
| 52 d time.Duration // Time before "now" for this stream
s Created. | |
| 53 state coordinator.LogStreamState | |
| 54 }{ | |
| 55 {"foo", 0, coordinator.LSStreaming},
// Not candidate for archival (too soon). | |
| 56 {"bar", 10 * time.Minute, coordinator.LSStreaming},
// Candidate for archival. | |
| 57 {"baz", 10 * time.Minute, coordinator.LSArchiveTasked},
// Not candidate for archival (archive tasked). | |
| 58 {"qux", 10 * time.Hour, coordinator.LSArchived},
// Not candidate for archival (archived). | |
| 59 {"quux", 10 * time.Hour, coordinator.LSStreaming},
// Candidate for archival. | |
| 60 } { | |
| 61 ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c,
v.name)) | |
| 62 | |
| 63 // The entry was created a week ago. | |
| 64 ls.Created = now.Add(-v.d) | |
| 65 ls.State = v.state | |
| 66 ls.TerminatedTime = now // Doesn't matter, but required
to Put the stream. | |
| 67 ls.ArchivedTime = now // Doesn't matter, but required
to Put the stream. | |
| 68 if err := ds.Get(c).Put(ls); err != nil { | |
| 69 panic(err) | |
| 70 } | |
| 71 } | |
| 72 ds.Get(c).Testable().CatchupIndexes() | |
| 73 | |
| 74 // This allows us to update our Context in test setup. | |
| 75 tap := ct.ArchivalPublisher{} | |
| 76 svcStub := ct.Services{ | |
| 77 AP: func() (coordinator.ArchivalPublisher, error) { | |
| 78 return &tap, nil | |
| 79 }, | |
| 80 } | |
| 81 c = coordinator.WithServices(c, &svcStub) | |
| 82 | |
| 83 b := Backend{} | |
| 84 | |
| 85 tb := testBase{Context: c} | |
| 86 r := httprouter.New() | |
| 87 b.InstallHandlers(r, tb.base) | |
| 88 | |
| 89 s := httptest.NewServer(r) | |
| 90 defer s.Close() | |
| 91 | |
| 92 endpoint := fmt.Sprintf("%s/archive/cron", s.URL) | |
| 93 | |
| 94 Convey(`With no configuration loaded, the endpoint will fail.`,
func() { | |
| 95 resp, err := http.Get(endpoint) | |
| 96 So(err, ShouldBeNil) | |
| 97 So(resp.StatusCode, ShouldEqual, http.StatusInternalServ
erError) | |
| 98 }) | |
| 99 | |
| 100 Convey(`With no task topic configured, the endpoint will fail.`,
func() { | |
| 101 svcStub.InitConfig() | |
| 102 | |
| 103 resp, err := http.Get(endpoint) | |
| 104 So(err, ShouldBeNil) | |
| 105 So(resp.StatusCode, ShouldEqual, http.StatusInternalServ
erError) | |
| 106 }) | |
| 107 | |
| 108 Convey(`With a configuration loaded`, func() { | |
| 109 svcStub.InitConfig() | |
| 110 svcStub.ServiceConfig.Coordinator.ArchiveTopic = "projec
ts/test/topics/archive-test-topic" | |
| 111 svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay = g
oogle.NewDuration(10 * time.Second) | |
| 112 svcStub.ServiceConfig.Coordinator.ArchiveDelayMax = goog
le.NewDuration(10 * time.Minute) | |
| 113 | |
| 114 Convey(`A request to the endpoint will be successful.`,
func() { | |
| 115 resp, err := http.Get(endpoint) | |
| 116 So(err, ShouldBeNil) | |
| 117 So(resp.StatusCode, ShouldEqual, http.StatusOK) | |
| 118 | |
| 119 // Candidate tasks should be scheduled. | |
| 120 So(tap.StreamNames(), ShouldResemble, []string{"
bar", "quux"}) | |
| 121 | |
| 122 // Hit the endpoint again, no additional tasks s
hould be scheduled. | |
| 123 Convey(`A subsequent endpoint hit will not sched
ule any additional tasks.`, func() { | |
| 124 resp, err = http.Get(endpoint) | |
| 125 So(err, ShouldBeNil) | |
| 126 So(resp.StatusCode, ShouldEqual, http.St
atusOK) | |
| 127 So(tap.StreamNames(), ShouldResemble, []
string{"bar", "quux"}) | |
| 128 }) | |
| 129 }) | |
| 130 | |
| 131 Convey(`When scheduling multiple tasks`, func() { | |
| 132 b.multiTaskBatchSize = 5 | |
| 133 | |
| 134 // Create a lot of archival candidate tasks to s
chedule. | |
| 135 // | |
| 136 // Note that since task queue names are returned
sorted, we want to | |
| 137 // name these streams greater than our stock str
eam names. | |
| 138 names := []string{"bar", "quux"} | |
| 139 for i := 0; i < 11; i++ { | |
| 140 ls := ct.TestLogStream(c, ct.TestLogStre
amDescriptor(c, fmt.Sprintf("stream-%02d", i))) | |
| 141 | |
| 142 ls.Created = now.Add(-(10 * time.Minute)
) | |
| 143 ls.State = coordinator.LSStreaming | |
| 144 ls.TerminalIndex = 1337 | |
| 145 ls.TerminatedTime = now | |
| 146 if err := ds.Get(c).Put(ls); err != nil
{ | |
| 147 panic(err) | |
| 148 } | |
| 149 | |
| 150 names = append(names, ls.Name) | |
| 151 } | |
| 152 ds.Get(c).Testable().CatchupIndexes() | |
| 153 | |
| 154 Convey(`Will schedule all pages properly.`, func
() { | |
| 155 // Ensure that all of these tasks get ad
ded to the task queue. | |
| 156 resp, err := http.Get(endpoint) | |
| 157 So(err, ShouldBeNil) | |
| 158 So(resp.StatusCode, ShouldEqual, http.St
atusOK) | |
| 159 So(tap.StreamNames(), ShouldResemble, na
mes) | |
| 160 | |
| 161 // Use this opportunity to assert that n
one of the scheduled streams | |
| 162 // have any settle or completion delay. | |
| 163 for _, at := range tap.Tasks() { | |
| 164 So(at.SettleDelay.Duration(), Sh
ouldEqual, 0) | |
| 165 So(at.CompletePeriod.Duration(),
ShouldEqual, 0) | |
| 166 } | |
| 167 | |
| 168 Convey(`Will not schedule additional tas
ks on the next run.`, func() { | |
| 169 resp, err := http.Get(endpoint) | |
| 170 So(err, ShouldBeNil) | |
| 171 So(resp.StatusCode, ShouldEqual,
http.StatusOK) | |
| 172 So(tap.StreamNames(), ShouldRese
mble, names) | |
| 173 }) | |
| 174 }) | |
| 175 | |
| 176 Convey(`Will not schedule tasks if task scheduli
ng fails.`, func() { | |
| 177 tap.Err = errors.New("test error") | |
| 178 | |
| 179 // Ensure that all of these tasks get ad
ded to the task queue. | |
| 180 resp, err := http.Get(endpoint) | |
| 181 So(err, ShouldBeNil) | |
| 182 So(resp.StatusCode, ShouldEqual, http.St
atusInternalServerError) | |
| 183 So(tap.StreamNames(), ShouldResemble, []
string{}) | |
| 184 | |
| 185 Convey(`And will schedule streams next r
un.`, func() { | |
| 186 tap.Err = nil | |
| 187 | |
| 188 resp, err := http.Get(endpoint) | |
| 189 So(err, ShouldBeNil) | |
| 190 So(resp.StatusCode, ShouldEqual,
http.StatusOK) | |
| 191 So(tap.StreamNames(), ShouldRese
mble, names) | |
| 192 }) | |
| 193 }) | |
| 194 }) | |
| 195 }) | |
| 196 }) | |
| 197 } | |
| OLD | NEW |