| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package backend | 5 package backend |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "fmt" | 8 "fmt" |
| 9 "net/http" | 9 "net/http" |
| 10 "net/http/httptest" | 10 "net/http/httptest" |
| (...skipping 22 matching lines...) Expand all Loading... |
| 33 Convey(`A testing environment`, t, func() { | 33 Convey(`A testing environment`, t, func() { |
| 34 c := gaetesting.TestingContext() | 34 c := gaetesting.TestingContext() |
| 35 c, _ = testclock.UseTime(c, testclock.TestTimeUTC) | 35 c, _ = testclock.UseTime(c, testclock.TestTimeUTC) |
| 36 | 36 |
| 37 // Add the archival index from "index.yaml". | 37 // Add the archival index from "index.yaml". |
| 38 ds.Get(c).Testable().AddIndexes( | 38 ds.Get(c).Testable().AddIndexes( |
| 39 &ds.IndexDefinition{ | 39 &ds.IndexDefinition{ |
| 40 Kind: "LogStream", | 40 Kind: "LogStream", |
| 41 SortBy: []ds.IndexColumn{ | 41 SortBy: []ds.IndexColumn{ |
| 42 {Property: "State"}, | 42 {Property: "State"}, |
| 43 » » » » » {Property: "Updated"}, | 43 » » » » » {Property: "Created", Descending: true}, |
| 44 }, | 44 }, |
| 45 }, | 45 }, |
| 46 ) | 46 ) |
| 47 | 47 |
| 48 // Install a base set of streams. | 48 // Install a base set of streams. |
| 49 // |
| 50 // If the stream was created >= 10min ago, it will be candidate
for |
| 51 // archival. |
| 52 now := clock.Now(c) |
| 49 for _, v := range []struct { | 53 for _, v := range []struct { |
| 50 » » » name string | 54 » » » name string |
| 51 » » » d time.Duration | 55 » » » d time.Duration // Time before "now" for this stream
s Created. |
| 52 » » » term bool | 56 » » » state coordinator.LogStreamState |
| 53 }{ | 57 }{ |
| 54 » » » {"foo", 0, true}, // Not candidate for a
rchival (too soon). | 58 » » » {"foo", 0, coordinator.LSStreaming},
// Not candidate for archival (too soon). |
| 55 » » » {"bar", 10 * time.Minute, true}, // Candidate for archi
val. | 59 » » » {"bar", 10 * time.Minute, coordinator.LSStreaming},
// Candidate for archival. |
| 56 » » » {"baz", 10 * time.Minute, false}, // Not candidate for a
rchival (not terminal). | 60 » » » {"baz", 10 * time.Minute, coordinator.LSArchiveTasked},
// Not candidate for archival (archive tasked). |
| 57 » » » {"qux", 24 * time.Hour, false}, // Candidate for non-t
erminal archival. | 61 » » » {"qux", 10 * time.Hour, coordinator.LSArchived},
// Not candidate for archival (archived). |
| 62 » » » {"quux", 10 * time.Hour, coordinator.LSStreaming},
// Candidate for archival. |
| 58 } { | 63 } { |
| 59 ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c,
v.name)) | 64 ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c,
v.name)) |
| 60 | 65 |
| 61 // The entry was created a week ago. | 66 // The entry was created a week ago. |
| 62 » » » ls.Created = clock.Now(c).Add(-(7 * 24 * time.Hour)) | 67 » » » ls.Created = now.Add(-v.d) |
| 63 » » » ls.Updated = clock.Now(c).Add(-v.d) | 68 » » » ls.State = v.state |
| 64 » » » if v.term { | 69 » » » ls.TerminatedTime = now // Doesn't matter, but required
to Put the stream. |
| 65 » » » » ls.State = coordinator.LSTerminated | 70 » » » ls.ArchivedTime = now // Doesn't matter, but required
to Put the stream. |
| 66 » » » } | 71 » » » if err := ds.Get(c).Put(ls); err != nil { |
| 67 » » » if err := ls.Put(ds.Get(c)); err != nil { | |
| 68 panic(err) | 72 panic(err) |
| 69 } | 73 } |
| 70 } | 74 } |
| 71 ds.Get(c).Testable().CatchupIndexes() | 75 ds.Get(c).Testable().CatchupIndexes() |
| 72 | 76 |
| 73 // This allows us to update our Context in test setup. | 77 // This allows us to update our Context in test setup. |
| 74 » » b := Backend{ | 78 » » b := Backend{} |
| 75 » » » multiTaskBatchSize: 5, | |
| 76 » » } | |
| 77 | 79 |
| 78 tb := testBase{Context: c} | 80 tb := testBase{Context: c} |
| 79 r := httprouter.New() | 81 r := httprouter.New() |
| 80 b.InstallHandlers(r, tb.base) | 82 b.InstallHandlers(r, tb.base) |
| 81 | 83 |
| 82 s := httptest.NewServer(r) | 84 s := httptest.NewServer(r) |
| 83 defer s.Close() | 85 defer s.Close() |
| 84 | 86 |
| 85 » » Convey(`With no configuration loaded`, func() { | 87 » » endpoint := fmt.Sprintf("%s/archive/cron", s.URL) |
| 86 » » » for _, ep := range []string{"terminal", "nonterminal"} { | 88 |
| 87 » » » » Convey(fmt.Sprintf(`A %q endpoint hit will fail.
`, ep), func() { | 89 » » Convey(`With no configuration loaded, the endpoint will fail.`,
func() { |
| 88 » » » » » resp, err := http.Get(fmt.Sprintf("%s/ar
chive/cron/%s", s.URL, ep)) | 90 » » » resp, err := http.Get(endpoint) |
| 89 » » » » » So(err, ShouldBeNil) | 91 » » » So(err, ShouldBeNil) |
| 90 » » » » » So(resp.StatusCode, ShouldEqual, http.St
atusInternalServerError) | 92 » » » So(resp.StatusCode, ShouldEqual, http.StatusInternalServ
erError) |
| 91 » » » » }) | |
| 92 » » » } | |
| 93 }) | 93 }) |
| 94 | 94 |
| 95 Convey(`With a configuration loaded`, func() { | 95 Convey(`With a configuration loaded`, func() { |
| 96 qName := "archive-test-queue" | 96 qName := "archive-test-queue" |
| 97 c = ct.UseConfig(c, &svcconfig.Coordinator{ | 97 c = ct.UseConfig(c, &svcconfig.Coordinator{ |
| 98 ArchiveDelay: google.NewDuration(10 * time.M
inute), | |
| 99 ArchiveDelayMax: google.NewDuration(24 * time.H
our), | |
| 100 ArchiveTaskQueue: qName, | 98 ArchiveTaskQueue: qName, |
| 99 ArchiveDelayMax: google.NewDuration(10 * time.M
inute), |
| 101 }) | 100 }) |
| 102 tb.Context = c | 101 tb.Context = c |
| 103 | 102 |
| 104 » » » Convey(`With no task queue configured`, func() { | 103 » » » Convey(`With no task queue configured, the endpoint will
fail.`, func() { |
| 105 » » » » for _, ep := range []string{"terminal", "nonterm
inal", "purge"} { | 104 » » » » resp, err := http.Get(endpoint) |
| 106 » » » » » Convey(fmt.Sprintf(`A %q endpoint hit wi
ll fail.`, ep), func() { | 105 » » » » So(err, ShouldBeNil) |
| 107 » » » » » » resp, err := http.Get(fmt.Sprint
f("%s/archive/cron/%s", s.URL, ep)) | 106 » » » » So(resp.StatusCode, ShouldEqual, http.StatusInte
rnalServerError) |
| 108 » » » » » » So(err, ShouldBeNil) | |
| 109 » » » » » » So(resp.StatusCode, ShouldEqual,
http.StatusInternalServerError) | |
| 110 » » » » » }) | |
| 111 » » » » } | |
| 112 }) | 107 }) |
| 113 | 108 |
| 114 Convey(`With a task queue configured`, func() { | 109 Convey(`With a task queue configured`, func() { |
| 115 tq.Get(c).Testable().CreateQueue(qName) | 110 tq.Get(c).Testable().CreateQueue(qName) |
| 116 | 111 |
| 117 » » » » Convey(`A terminal endpoint hit will be successf
ul and idempotent.`, func() { | 112 » » » » streams := func() []string { |
| 118 » » » » » resp, err := http.Get(fmt.Sprintf("%s/ar
chive/cron/terminal", s.URL)) | 113 » » » » » s, err := ct.GetArchiveTaskStreams(tq.Ge
t(c), qName) |
| 114 » » » » » if err != nil { |
| 115 » » » » » » panic(err) |
| 116 » » » » » } |
| 117 » » » » » return s |
| 118 » » » » } |
| 119 |
| 120 » » » » Convey(`A request to the endpoint will be succes
sful.`, func() { |
| 121 » » » » » resp, err := http.Get(endpoint) |
| 119 So(err, ShouldBeNil) | 122 So(err, ShouldBeNil) |
| 120 So(resp.StatusCode, ShouldEqual, http.St
atusOK) | 123 So(resp.StatusCode, ShouldEqual, http.St
atusOK) |
| 121 | 124 |
| 122 // Candidate tasks should be scheduled. | 125 // Candidate tasks should be scheduled. |
| 123 » » » » » tasks := tq.Get(c).Testable().GetSchedul
edTasks()[qName] | 126 » » » » » So(streams(), ShouldResemble, []string{"
bar", "quux"}) |
| 124 » » » » » So(tasks, shouldHaveTasks, archiveTaskNa
me("testing/+/bar")) | |
| 125 | 127 |
| 126 » » » » » // Hit the endpoint again, the same task
s should be scheduled. | 128 » » » » » // Hit the endpoint again, no additional
tasks should be scheduled. |
| 127 » » » » » resp, err = http.Get(fmt.Sprintf("%s/arc
hive/cron/terminal", s.URL)) | 129 » » » » » Convey(`A subsequent endpoint hit will n
ot schedule any additional tasks.`, func() { |
| 128 » » » » » So(err, ShouldBeNil) | 130 » » » » » » resp, err = http.Get(endpoint) |
| 129 » » » » » So(resp.StatusCode, ShouldEqual, http.St
atusOK) | 131 » » » » » » So(err, ShouldBeNil) |
| 130 | 132 » » » » » » So(resp.StatusCode, ShouldEqual,
http.StatusOK) |
| 131 » » » » » tasks2 := tq.Get(c).Testable().GetSchedu
ledTasks()[qName] | 133 » » » » » » So(streams(), ShouldResemble, []
string{"bar", "quux"}) |
| 132 » » » » » So(tasks2, ShouldResemble, tasks) | 134 » » » » » }) |
| 133 » » » » }) | |
| 134 | |
| 135 » » » » Convey(`A non-terminal endpoint hit will be succ
essful and idempotent.`, func() { | |
| 136 » » » » » resp, err := http.Get(fmt.Sprintf("%s/ar
chive/cron/nonterminal", s.URL)) | |
| 137 » » » » » So(err, ShouldBeNil) | |
| 138 » » » » » So(resp.StatusCode, ShouldEqual, http.St
atusOK) | |
| 139 | |
| 140 » » » » » // Candidate tasks should be scheduled. | |
| 141 » » » » » tasks := tq.Get(c).Testable().GetSchedul
edTasks()[qName] | |
| 142 » » » » » So(tasks, shouldHaveTasks, archiveTaskNa
me("testing/+/qux")) | |
| 143 | |
| 144 » » » » » // Hit the endpoint again, the same task
s should be scheduled. | |
| 145 » » » » » resp, err = http.Get(fmt.Sprintf("%s/arc
hive/cron/nonterminal", s.URL)) | |
| 146 » » » » » So(err, ShouldBeNil) | |
| 147 » » » » » So(resp.StatusCode, ShouldEqual, http.St
atusOK) | |
| 148 | |
| 149 » » » » » tasks2 := tq.Get(c).Testable().GetSchedu
ledTasks()[qName] | |
| 150 » » » » » So(tasks2, ShouldResemble, tasks) | |
| 151 » » » » }) | |
| 152 | |
| 153 » » » » Convey(`A terminal endpoint hit followed by a no
n-terminal endpoint hit will be successful.`, func() { | |
| 154 » » » » » resp, err := http.Get(fmt.Sprintf("%s/ar
chive/cron/terminal", s.URL)) | |
| 155 » » » » » So(err, ShouldBeNil) | |
| 156 » » » » » So(resp.StatusCode, ShouldEqual, http.St
atusOK) | |
| 157 » » » » » So(tq.Get(c).Testable().GetScheduledTask
s()[qName], shouldHaveTasks, archiveTaskName("testing/+/bar")) | |
| 158 | |
| 159 » » » » » resp, err = http.Get(fmt.Sprintf("%s/arc
hive/cron/nonterminal", s.URL)) | |
| 160 » » » » » So(err, ShouldBeNil) | |
| 161 » » » » » So(resp.StatusCode, ShouldEqual, http.St
atusOK) | |
| 162 » » » » » So(tq.Get(c).Testable().GetScheduledTask
s()[qName], shouldHaveTasks, | |
| 163 » » » » » » archiveTaskName("testing/+/bar")
, archiveTaskName("testing/+/qux")) | |
| 164 }) | 135 }) |
| 165 | 136 |
| 166 Convey(`When scheduling multiple tasks`, func()
{ | 137 Convey(`When scheduling multiple tasks`, func()
{ |
| 138 b.multiTaskBatchSize = 5 |
| 139 |
| 167 // Create a lot of archival candidate ta
sks to schedule. | 140 // Create a lot of archival candidate ta
sks to schedule. |
| 168 » » » » » var names []string | 141 » » » » » // |
| 142 » » » » » // Note that since task queue names are
returned sorted, we want to |
| 143 » » » » » // name these streams greater than our s
tock stream names. |
| 144 » » » » » names := []string{"bar", "quux"} |
| 169 for i := 0; i < 11; i++ { | 145 for i := 0; i < 11; i++ { |
| 170 » » » » » » ls := ct.TestLogStream(c, ct.Tes
tLogStreamDescriptor(c, fmt.Sprintf("stream-%d", i))) | 146 » » » » » » ls := ct.TestLogStream(c, ct.Tes
tLogStreamDescriptor(c, fmt.Sprintf("stream-%02d", i))) |
| 171 | 147 |
| 172 » » » » » » ls.Created = clock.Now(c).Add(-(
10 * time.Minute)) | 148 » » » » » » ls.Created = now.Add(-(10 * time
.Minute)) |
| 173 » » » » » » ls.Updated = ls.Created | 149 » » » » » » ls.State = coordinator.LSStreami
ng |
| 174 » » » » » » ls.State = coordinator.LSTermina
ted | |
| 175 ls.TerminalIndex = 1337 | 150 ls.TerminalIndex = 1337 |
| 176 » » » » » » if err := ls.Put(ds.Get(c)); err
!= nil { | 151 » » » » » » ls.TerminatedTime = now |
| 152 » » » » » » if err := ds.Get(c).Put(ls); err
!= nil { |
| 177 panic(err) | 153 panic(err) |
| 178 } | 154 } |
| 179 | 155 |
| 180 names = append(names, ls.Name) | 156 names = append(names, ls.Name) |
| 181 } | 157 } |
| 182 names = append(names, "bar") | |
| 183 ds.Get(c).Testable().CatchupIndexes() | 158 ds.Get(c).Testable().CatchupIndexes() |
| 184 | 159 |
| 185 Convey(`Will schedule all pages properly
.`, func() { | 160 Convey(`Will schedule all pages properly
.`, func() { |
| 186 » » » » » » taskNames := make([]interface{},
len(names)) | 161 » » » » » » // Ensure that all of these task
s get added to the task queue. |
| 187 » » » » » » for i, n := range names { | 162 » » » » » » resp, err := http.Get(endpoint) |
| 188 » » » » » » » taskNames[i] = archiveTa
skName(fmt.Sprintf("testing/+/%s", n)) | 163 » » » » » » So(err, ShouldBeNil) |
| 164 » » » » » » So(resp.StatusCode, ShouldEqual,
http.StatusOK) |
| 165 » » » » » » So(streams(), ShouldResemble, na
mes) |
| 166 |
| 167 » » » » » » // Verify that all tasks are con
figured to execute immediately. |
| 168 » » » » » » for _, t := range tq.Get(c).Test
able().GetScheduledTasks()[qName] { |
| 169 » » » » » » » So(t.ETA, ShouldResemble
, now) |
| 189 } | 170 } |
| 190 | 171 |
| 191 » » » » » » // Ensure that all of these task
s get added to the task queue. | 172 » » » » » » Convey(`Will not schedule additi
onal tasks on the next run.`, func() { |
| 192 » » » » » » resp, err := http.Get(fmt.Sprint
f("%s/archive/cron/terminal", s.URL)) | 173 » » » » » » » resp, err := http.Get(en
dpoint) |
| 193 » » » » » » So(err, ShouldBeNil) | |
| 194 » » » » » » So(resp.StatusCode, ShouldEqual,
http.StatusOK) | |
| 195 » » » » » » So(tq.Get(c).Testable().GetSched
uledTasks()[qName], shouldHaveTasks, taskNames...) | |
| 196 | |
| 197 » » » » » » Convey(`Will be successful when
rescheduling the same tasks.`, func() { | |
| 198 » » » » » » » resp, err := http.Get(fm
t.Sprintf("%s/archive/cron/terminal", s.URL)) | |
| 199 So(err, ShouldBeNil) | 174 So(err, ShouldBeNil) |
| 200 So(resp.StatusCode, Shou
ldEqual, http.StatusOK) | 175 So(resp.StatusCode, Shou
ldEqual, http.StatusOK) |
| 201 » » » » » » » So(tq.Get(c).Testable().
GetScheduledTasks()[qName], shouldHaveTasks, taskNames...) | 176 » » » » » » » So(streams(), ShouldRese
mble, names) |
| 202 }) | 177 }) |
| 203 }) | 178 }) |
| 204 | 179 |
| 205 » » » » » Convey(`Will return an error if task sch
eduling fails.`, func() { | 180 » » » » » Convey(`Will not schedule tasks if task
scheduling fails.`, func() { |
| 206 c, fb := featureBreaker.FilterTQ
(c, nil) | 181 c, fb := featureBreaker.FilterTQ
(c, nil) |
| 207 tb.Context = c | 182 tb.Context = c |
| 208 fb.BreakFeatures(errors.New("tes
t error"), "AddMulti") | 183 fb.BreakFeatures(errors.New("tes
t error"), "AddMulti") |
| 209 | 184 |
| 210 // Ensure that all of these task
s get added to the task queue. | 185 // Ensure that all of these task
s get added to the task queue. |
| 211 » » » » » » resp, err := http.Get(fmt.Sprint
f("%s/archive/cron/terminal", s.URL)) | 186 » » » » » » resp, err := http.Get(endpoint) |
| 212 So(err, ShouldBeNil) | 187 So(err, ShouldBeNil) |
| 213 So(resp.StatusCode, ShouldEqual,
http.StatusInternalServerError) | 188 So(resp.StatusCode, ShouldEqual,
http.StatusInternalServerError) |
| 214 » » » » » }) | 189 » » » » » » So(streams(), ShouldResemble, []
string{}) |
| 215 | 190 |
| 216 » » » » » Convey(`Will return an error if a single
task scheduling fails.`, func() { | 191 » » » » » » Convey(`And will schedule stream
s next run.`, func() { |
| 217 » » » » » » merr := make(errors.MultiError,
len(names)) | 192 » » » » » » » fb.UnbreakFeatures("AddM
ulti") |
| 218 » » » » » » merr[0] = errors.New("test error
") | |
| 219 | 193 |
| 220 » » » » » » c, fb := featureBreaker.FilterTQ
(c, nil) | 194 » » » » » » » resp, err := http.Get(en
dpoint) |
| 221 » » » » » » tb.Context = c | 195 » » » » » » » So(err, ShouldBeNil) |
| 222 » » » » » » fb.BreakFeatures(merr, "AddMulti
") | 196 » » » » » » » So(resp.StatusCode, Shou
ldEqual, http.StatusOK) |
| 223 | 197 » » » » » » » So(streams(), ShouldRese
mble, names) |
| 224 » » » » » » // Ensure that all of these task
s get added to the task queue. | 198 » » » » » » }) |
| 225 » » » » » » resp, err := http.Get(fmt.Sprint
f("%s/archive/cron/terminal", s.URL)) | |
| 226 » » » » » » So(err, ShouldBeNil) | |
| 227 » » » » » » So(resp.StatusCode, ShouldEqual,
http.StatusInternalServerError) | |
| 228 » » » » » }) | |
| 229 | |
| 230 » » » » » Convey(`And a purge endpoint hit will pu
rge the tasks.`, func() { | |
| 231 » » » » » » resp, err := http.Get(fmt.Sprint
f("%s/archive/cron/purge", s.URL)) | |
| 232 » » » » » » So(err, ShouldBeNil) | |
| 233 » » » » » » So(resp.StatusCode, ShouldEqual,
http.StatusOK) | |
| 234 » » » » » » So(tq.Get(c).Testable().GetSched
uledTasks()[qName], shouldHaveTasks) | |
| 235 }) | 199 }) |
| 236 }) | 200 }) |
| 237 }) | 201 }) |
| 238 }) | 202 }) |
| 239 }) | 203 }) |
| 240 } | 204 } |
| OLD | NEW |