| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package backend | 5 package backend |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "net/http" | 9 "net/http" |
| 10 "net/http/httptest" | 10 "net/http/httptest" |
| 11 "sort" |
| 11 "testing" | 12 "testing" |
| 12 "time" | 13 "time" |
| 13 | 14 |
| 14 "github.com/julienschmidt/httprouter" | 15 "github.com/julienschmidt/httprouter" |
| 15 ds "github.com/luci/gae/service/datastore" | 16 ds "github.com/luci/gae/service/datastore" |
| 17 tq "github.com/luci/gae/service/taskqueue" |
| 16 "github.com/luci/luci-go/appengine/gaetesting" | 18 "github.com/luci/luci-go/appengine/gaetesting" |
| 17 "github.com/luci/luci-go/appengine/logdog/coordinator" | 19 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 18 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" | 20 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" |
| 19 "github.com/luci/luci-go/common/clock" | 21 "github.com/luci/luci-go/common/clock" |
| 20 "github.com/luci/luci-go/common/clock/testclock" | 22 "github.com/luci/luci-go/common/clock/testclock" |
| 23 memoryConfig "github.com/luci/luci-go/common/config/impl/memory" |
| 21 "github.com/luci/luci-go/common/errors" | 24 "github.com/luci/luci-go/common/errors" |
| 22 "github.com/luci/luci-go/common/proto/google" | 25 "github.com/luci/luci-go/common/proto/google" |
| 23 | 26 |
| 24 . "github.com/smartystreets/goconvey/convey" | 27 . "github.com/smartystreets/goconvey/convey" |
| 25 ) | 28 ) |
| 26 | 29 |
| 27 func TestHandleArchiveCron(t *testing.T) { | 30 func TestHandleArchiveCron(t *testing.T) { |
| 28 t.Parallel() | 31 t.Parallel() |
| 29 | 32 |
| 33 const queueName = "test-archive-scan-queue" |
| 34 |
| 35 Convey(`A testing environment`, t, func() { |
| 36 c := gaetesting.TestingContext() |
| 37 |
| 38 mcfg := map[string]memoryConfig.ConfigSet{} |
| 39 c = memoryConfig.Use(c, mcfg) |
| 40 |
| 41 svcStub := ct.Services{} |
| 42 svcStub.InitConfig() |
| 43 svcStub.ServiceConfig.Coordinator.ArchiveScanProjectQueueName =
queueName |
| 44 c = coordinator.WithServices(c, &svcStub) |
| 45 |
| 46 b := Backend{} |
| 47 |
| 48 tb := testBase{Context: c} |
| 49 r := httprouter.New() |
| 50 b.InstallHandlers(r, tb.base) |
| 51 |
| 52 s := httptest.NewServer(r) |
| 53 defer s.Close() |
| 54 |
| 55 endpoint := fmt.Sprintf("%s/archive/cron", s.URL) |
| 56 |
| 57 Convey(`When projects are registered`, func() { |
| 58 mcfg["projects/foo"] = memoryConfig.ConfigSet{} |
| 59 mcfg["projects/bar"] = memoryConfig.ConfigSet{} |
| 60 mcfg["projects/baz"] = memoryConfig.ConfigSet{} |
| 61 |
| 62 Convey(`When the testing queue exists`, func() { |
| 63 tq.Get(c).Testable().CreateQueue(queueName) |
| 64 |
| 65 Convey(`Will dispatch archive tasks for each reg
istered project.`, func() { |
| 66 resp, err := http.Get(endpoint) |
| 67 So(err, ShouldBeNil) |
| 68 So(resp.StatusCode, ShouldEqual, http.St
atusOK) |
| 69 |
| 70 var namespaces []string |
| 71 for _, task := range tq.Get(c).Testable(
).GetScheduledTasks()[queueName] { |
| 72 namespaces = append(namespaces,
task.Header["X-Appengine-Current-Namespace"]...) |
| 73 } |
| 74 sort.Strings(namespaces) |
| 75 |
| 76 pns := coordinator.ProjectNamespace |
| 77 So(namespaces, ShouldResemble, []string{
pns("bar"), pns("baz"), pns("foo")}) |
| 78 }) |
| 79 }) |
| 80 }) |
| 81 }) |
| 82 } |
| 83 |
| 84 func TestHandleArchiveScanTask(t *testing.T) { |
| 85 t.Parallel() |
| 86 |
| 30 Convey(`A testing environment`, t, func() { | 87 Convey(`A testing environment`, t, func() { |
| 31 c := gaetesting.TestingContext() | 88 c := gaetesting.TestingContext() |
| 32 c, _ = testclock.UseTime(c, testclock.TestTimeUTC) | 89 c, _ = testclock.UseTime(c, testclock.TestTimeUTC) |
| 33 | 90 |
| 91 // In production, this task will be called with the namespace of
is task |
| 92 // queue entry installed. However, since we're calling it manual
ly, we'll |
| 93 // just fix the namespace. |
| 94 if err := coordinator.WithProjectNamespace(&c, "test"); err != n
il { |
| 95 panic(err) |
| 96 } |
| 97 |
| 34 // Add the archival index from "index.yaml". | 98 // Add the archival index from "index.yaml". |
| 35 ds.Get(c).Testable().AddIndexes( | 99 ds.Get(c).Testable().AddIndexes( |
| 36 &ds.IndexDefinition{ | 100 &ds.IndexDefinition{ |
| 37 Kind: "LogStream", | 101 Kind: "LogStream", |
| 38 SortBy: []ds.IndexColumn{ | 102 SortBy: []ds.IndexColumn{ |
| 39 {Property: "State"}, | 103 {Property: "State"}, |
| 40 {Property: "Created", Descending: true}, | 104 {Property: "Created", Descending: true}, |
| 41 }, | 105 }, |
| 42 }, | 106 }, |
| 43 ) | 107 ) |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 82 | 146 |
| 83 b := Backend{} | 147 b := Backend{} |
| 84 | 148 |
| 85 tb := testBase{Context: c} | 149 tb := testBase{Context: c} |
| 86 r := httprouter.New() | 150 r := httprouter.New() |
| 87 b.InstallHandlers(r, tb.base) | 151 b.InstallHandlers(r, tb.base) |
| 88 | 152 |
| 89 s := httptest.NewServer(r) | 153 s := httptest.NewServer(r) |
| 90 defer s.Close() | 154 defer s.Close() |
| 91 | 155 |
| 92 » » endpoint := fmt.Sprintf("%s/archive/cron", s.URL) | 156 » » endpoint := fmt.Sprintf("%s/archive/cron/scan", s.URL) |
| 93 | 157 |
| 94 Convey(`With no configuration loaded, the endpoint will fail.`,
func() { | 158 Convey(`With no configuration loaded, the endpoint will fail.`,
func() { |
| 95 » » » resp, err := http.Get(endpoint) | 159 » » » resp, err := http.Post(endpoint, "", nil) |
| 96 So(err, ShouldBeNil) | 160 So(err, ShouldBeNil) |
| 97 So(resp.StatusCode, ShouldEqual, http.StatusInternalServ
erError) | 161 So(resp.StatusCode, ShouldEqual, http.StatusInternalServ
erError) |
| 98 }) | 162 }) |
| 99 | 163 |
| 100 Convey(`With no task topic configured, the endpoint will fail.`,
func() { | 164 Convey(`With no task topic configured, the endpoint will fail.`,
func() { |
| 101 svcStub.InitConfig() | 165 svcStub.InitConfig() |
| 102 | 166 |
| 103 » » » resp, err := http.Get(endpoint) | 167 » » » resp, err := http.Post(endpoint, "", nil) |
| 104 So(err, ShouldBeNil) | 168 So(err, ShouldBeNil) |
| 105 So(resp.StatusCode, ShouldEqual, http.StatusInternalServ
erError) | 169 So(resp.StatusCode, ShouldEqual, http.StatusInternalServ
erError) |
| 106 }) | 170 }) |
| 107 | 171 |
| 108 Convey(`With a configuration loaded`, func() { | 172 Convey(`With a configuration loaded`, func() { |
| 109 svcStub.InitConfig() | 173 svcStub.InitConfig() |
| 110 svcStub.ServiceConfig.Coordinator.ArchiveTopic = "projec
ts/test/topics/archive-test-topic" | 174 svcStub.ServiceConfig.Coordinator.ArchiveTopic = "projec
ts/test/topics/archive-test-topic" |
| 111 svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay = g
oogle.NewDuration(10 * time.Second) | 175 svcStub.ServiceConfig.Coordinator.ArchiveSettleDelay = g
oogle.NewDuration(10 * time.Second) |
| 112 svcStub.ServiceConfig.Coordinator.ArchiveDelayMax = goog
le.NewDuration(10 * time.Minute) | 176 svcStub.ServiceConfig.Coordinator.ArchiveDelayMax = goog
le.NewDuration(10 * time.Minute) |
| 113 | 177 |
| 114 Convey(`A request to the endpoint will be successful.`,
func() { | 178 Convey(`A request to the endpoint will be successful.`,
func() { |
| 115 » » » » resp, err := http.Get(endpoint) | 179 » » » » resp, err := http.Post(endpoint, "", nil) |
| 116 So(err, ShouldBeNil) | 180 So(err, ShouldBeNil) |
| 117 So(resp.StatusCode, ShouldEqual, http.StatusOK) | 181 So(resp.StatusCode, ShouldEqual, http.StatusOK) |
| 118 | 182 |
| 119 // Candidate tasks should be scheduled. | 183 // Candidate tasks should be scheduled. |
| 120 So(tap.StreamNames(), ShouldResemble, []string{"
bar", "quux"}) | 184 So(tap.StreamNames(), ShouldResemble, []string{"
bar", "quux"}) |
| 121 | 185 |
| 122 // Hit the endpoint again, no additional tasks s
hould be scheduled. | 186 // Hit the endpoint again, no additional tasks s
hould be scheduled. |
| 123 Convey(`A subsequent endpoint hit will not sched
ule any additional tasks.`, func() { | 187 Convey(`A subsequent endpoint hit will not sched
ule any additional tasks.`, func() { |
| 124 » » » » » resp, err = http.Get(endpoint) | 188 » » » » » resp, err = http.Post(endpoint, "", nil) |
| 125 So(err, ShouldBeNil) | 189 So(err, ShouldBeNil) |
| 126 So(resp.StatusCode, ShouldEqual, http.St
atusOK) | 190 So(resp.StatusCode, ShouldEqual, http.St
atusOK) |
| 127 So(tap.StreamNames(), ShouldResemble, []
string{"bar", "quux"}) | 191 So(tap.StreamNames(), ShouldResemble, []
string{"bar", "quux"}) |
| 128 }) | 192 }) |
| 129 }) | 193 }) |
| 130 | 194 |
| 131 Convey(`When scheduling multiple tasks`, func() { | 195 Convey(`When scheduling multiple tasks`, func() { |
| 132 b.multiTaskBatchSize = 5 | 196 b.multiTaskBatchSize = 5 |
| 133 | 197 |
| 134 // Create a lot of archival candidate tasks to s
chedule. | 198 // Create a lot of archival candidate tasks to s
chedule. |
| (...skipping 11 matching lines...) Expand all Loading... |
| 146 if err := ds.Get(c).Put(ls); err != nil
{ | 210 if err := ds.Get(c).Put(ls); err != nil
{ |
| 147 panic(err) | 211 panic(err) |
| 148 } | 212 } |
| 149 | 213 |
| 150 names = append(names, ls.Name) | 214 names = append(names, ls.Name) |
| 151 } | 215 } |
| 152 ds.Get(c).Testable().CatchupIndexes() | 216 ds.Get(c).Testable().CatchupIndexes() |
| 153 | 217 |
| 154 Convey(`Will schedule all pages properly.`, func
() { | 218 Convey(`Will schedule all pages properly.`, func
() { |
| 155 // Ensure that all of these tasks get ad
ded to the task queue. | 219 // Ensure that all of these tasks get ad
ded to the task queue. |
| 156 » » » » » resp, err := http.Get(endpoint) | 220 » » » » » resp, err := http.Post(endpoint, "", nil
) |
| 157 So(err, ShouldBeNil) | 221 So(err, ShouldBeNil) |
| 158 So(resp.StatusCode, ShouldEqual, http.St
atusOK) | 222 So(resp.StatusCode, ShouldEqual, http.St
atusOK) |
| 159 So(tap.StreamNames(), ShouldResemble, na
mes) | 223 So(tap.StreamNames(), ShouldResemble, na
mes) |
| 160 | 224 |
| 161 // Use this opportunity to assert that n
one of the scheduled streams | 225 // Use this opportunity to assert that n
one of the scheduled streams |
| 162 // have any settle or completion delay. | 226 // have any settle or completion delay. |
| 163 for _, at := range tap.Tasks() { | 227 for _, at := range tap.Tasks() { |
| 164 So(at.SettleDelay.Duration(), Sh
ouldEqual, 0) | 228 So(at.SettleDelay.Duration(), Sh
ouldEqual, 0) |
| 165 So(at.CompletePeriod.Duration(),
ShouldEqual, 0) | 229 So(at.CompletePeriod.Duration(),
ShouldEqual, 0) |
| 166 } | 230 } |
| 167 | 231 |
| 168 Convey(`Will not schedule additional tas
ks on the next run.`, func() { | 232 Convey(`Will not schedule additional tas
ks on the next run.`, func() { |
| 169 » » » » » » resp, err := http.Get(endpoint) | 233 » » » » » » resp, err := http.Post(endpoint,
"", nil) |
| 170 So(err, ShouldBeNil) | 234 So(err, ShouldBeNil) |
| 171 So(resp.StatusCode, ShouldEqual,
http.StatusOK) | 235 So(resp.StatusCode, ShouldEqual,
http.StatusOK) |
| 172 So(tap.StreamNames(), ShouldRese
mble, names) | 236 So(tap.StreamNames(), ShouldRese
mble, names) |
| 173 }) | 237 }) |
| 174 }) | 238 }) |
| 175 | 239 |
| 176 Convey(`Will not schedule tasks if task scheduli
ng fails.`, func() { | 240 Convey(`Will not schedule tasks if task scheduli
ng fails.`, func() { |
| 177 tap.Err = errors.New("test error") | 241 tap.Err = errors.New("test error") |
| 178 | 242 |
| 179 // Ensure that all of these tasks get ad
ded to the task queue. | 243 // Ensure that all of these tasks get ad
ded to the task queue. |
| 180 » » » » » resp, err := http.Get(endpoint) | 244 » » » » » resp, err := http.Post(endpoint, "", nil
) |
| 181 So(err, ShouldBeNil) | 245 So(err, ShouldBeNil) |
| 182 So(resp.StatusCode, ShouldEqual, http.St
atusInternalServerError) | 246 So(resp.StatusCode, ShouldEqual, http.St
atusInternalServerError) |
| 183 So(tap.StreamNames(), ShouldResemble, []
string{}) | 247 So(tap.StreamNames(), ShouldResemble, []
string{}) |
| 184 | 248 |
| 185 Convey(`And will schedule streams next r
un.`, func() { | 249 Convey(`And will schedule streams next r
un.`, func() { |
| 186 tap.Err = nil | 250 tap.Err = nil |
| 187 | 251 |
| 188 » » » » » » resp, err := http.Get(endpoint) | 252 » » » » » » resp, err := http.Post(endpoint,
"", nil) |
| 189 So(err, ShouldBeNil) | 253 So(err, ShouldBeNil) |
| 190 So(resp.StatusCode, ShouldEqual,
http.StatusOK) | 254 So(resp.StatusCode, ShouldEqual,
http.StatusOK) |
| 191 So(tap.StreamNames(), ShouldRese
mble, names) | 255 So(tap.StreamNames(), ShouldRese
mble, names) |
| 192 }) | 256 }) |
| 193 }) | 257 }) |
| 194 }) | 258 }) |
| 195 }) | 259 }) |
| 196 }) | 260 }) |
| 197 } | 261 } |
| OLD | NEW |