Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(56)

Side by Side Diff: appengine/logdog/coordinator/backend/archiveCron_test.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Minor fixes, works in dev now. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package backend 5 package backend
6 6
7 import ( 7 import (
8 "fmt" 8 "fmt"
9 "net/http" 9 "net/http"
10 "net/http/httptest" 10 "net/http/httptest"
(...skipping 22 matching lines...) Expand all
33 Convey(`A testing environment`, t, func() { 33 Convey(`A testing environment`, t, func() {
34 c := gaetesting.TestingContext() 34 c := gaetesting.TestingContext()
35 c, _ = testclock.UseTime(c, testclock.TestTimeUTC) 35 c, _ = testclock.UseTime(c, testclock.TestTimeUTC)
36 36
37 // Add the archival index from "index.yaml". 37 // Add the archival index from "index.yaml".
38 ds.Get(c).Testable().AddIndexes( 38 ds.Get(c).Testable().AddIndexes(
39 &ds.IndexDefinition{ 39 &ds.IndexDefinition{
40 Kind: "LogStream", 40 Kind: "LogStream",
41 SortBy: []ds.IndexColumn{ 41 SortBy: []ds.IndexColumn{
42 {Property: "State"}, 42 {Property: "State"},
43 » » » » » {Property: "Updated"}, 43 » » » » » {Property: "Created", Descending: true},
44 }, 44 },
45 }, 45 },
46 ) 46 )
47 47
48 // Install a base set of streams. 48 // Install a base set of streams.
49 //
50 // If the stream was created >= 10min ago, it will be candidate for
51 // archival.
52 now := clock.Now(c)
49 for _, v := range []struct { 53 for _, v := range []struct {
50 » » » name string 54 » » » name string
51 » » » d time.Duration 55 » » » d time.Duration // Time before "now" for this stream s Created.
52 » » » term bool 56 » » » state coordinator.LogStreamState
53 }{ 57 }{
54 » » » {"foo", 0, true}, // Not candidate for a rchival (too soon). 58 » » » {"foo", 0, coordinator.LSStreaming}, // Not candidate for archival (too soon).
55 » » » {"bar", 10 * time.Minute, true}, // Candidate for archi val. 59 » » » {"bar", 10 * time.Minute, coordinator.LSStreaming}, // Candidate for archival.
56 » » » {"baz", 10 * time.Minute, false}, // Not candidate for a rchival (not terminal). 60 » » » {"baz", 10 * time.Minute, coordinator.LSArchiveTasked}, // Not candidate for archival (archive tasked).
57 » » » {"qux", 24 * time.Hour, false}, // Candidate for non-t erminal archival. 61 » » » {"qux", 10 * time.Hour, coordinator.LSArchived}, // Not candidate for archival (archived).
62 » » » {"quux", 10 * time.Hour, coordinator.LSStreaming}, // Candidate for archival.
58 } { 63 } {
59 ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, v.name)) 64 ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, v.name))
60 65
61 // The entry was created a week ago. 66 // The entry was created a week ago.
62 » » » ls.Created = clock.Now(c).Add(-(7 * 24 * time.Hour)) 67 » » » ls.Created = now.Add(-v.d)
63 » » » ls.Updated = clock.Now(c).Add(-v.d) 68 » » » ls.State = v.state
64 » » » if v.term { 69 » » » ls.TerminatedTime = now // Doesn't matter, but required to Put the stream.
65 » » » » ls.State = coordinator.LSTerminated 70 » » » ls.ArchivedTime = now // Doesn't matter, but required to Put the stream.
66 » » » } 71 » » » if err := ds.Get(c).Put(ls); err != nil {
67 » » » if err := ls.Put(ds.Get(c)); err != nil {
68 panic(err) 72 panic(err)
69 } 73 }
70 } 74 }
71 ds.Get(c).Testable().CatchupIndexes() 75 ds.Get(c).Testable().CatchupIndexes()
72 76
73 // This allows us to update our Context in test setup. 77 // This allows us to update our Context in test setup.
74 » » b := Backend{ 78 » » b := Backend{}
75 » » » multiTaskBatchSize: 5,
76 » » }
77 79
78 tb := testBase{Context: c} 80 tb := testBase{Context: c}
79 r := httprouter.New() 81 r := httprouter.New()
80 b.InstallHandlers(r, tb.base) 82 b.InstallHandlers(r, tb.base)
81 83
82 s := httptest.NewServer(r) 84 s := httptest.NewServer(r)
83 defer s.Close() 85 defer s.Close()
84 86
85 » » Convey(`With no configuration loaded`, func() { 87 » » endpoint := fmt.Sprintf("%s/archive/cron", s.URL)
86 » » » for _, ep := range []string{"terminal", "nonterminal"} { 88
87 » » » » Convey(fmt.Sprintf(`A %q endpoint hit will fail. `, ep), func() { 89 » » Convey(`With no configuration loaded, the endpoint will fail.`, func() {
88 » » » » » resp, err := http.Get(fmt.Sprintf("%s/ar chive/cron/%s", s.URL, ep)) 90 » » » resp, err := http.Get(endpoint)
89 » » » » » So(err, ShouldBeNil) 91 » » » So(err, ShouldBeNil)
90 » » » » » So(resp.StatusCode, ShouldEqual, http.St atusInternalServerError) 92 » » » So(resp.StatusCode, ShouldEqual, http.StatusInternalServ erError)
91 » » » » })
92 » » » }
93 }) 93 })
94 94
95 Convey(`With a configuration loaded`, func() { 95 Convey(`With a configuration loaded`, func() {
96 qName := "archive-test-queue" 96 qName := "archive-test-queue"
97 c = ct.UseConfig(c, &svcconfig.Coordinator{ 97 c = ct.UseConfig(c, &svcconfig.Coordinator{
98 ArchiveDelay: google.NewDuration(10 * time.M inute),
99 ArchiveDelayMax: google.NewDuration(24 * time.H our),
100 ArchiveTaskQueue: qName, 98 ArchiveTaskQueue: qName,
99 ArchiveDelayMax: google.NewDuration(10 * time.M inute),
101 }) 100 })
102 tb.Context = c 101 tb.Context = c
103 102
104 » » » Convey(`With no task queue configured`, func() { 103 » » » Convey(`With no task queue configured, the endpoint will fail.`, func() {
105 » » » » for _, ep := range []string{"terminal", "nonterm inal", "purge"} { 104 » » » » resp, err := http.Get(endpoint)
106 » » » » » Convey(fmt.Sprintf(`A %q endpoint hit wi ll fail.`, ep), func() { 105 » » » » So(err, ShouldBeNil)
107 » » » » » » resp, err := http.Get(fmt.Sprint f("%s/archive/cron/%s", s.URL, ep)) 106 » » » » So(resp.StatusCode, ShouldEqual, http.StatusInte rnalServerError)
108 » » » » » » So(err, ShouldBeNil)
109 » » » » » » So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
110 » » » » » })
111 » » » » }
112 }) 107 })
113 108
114 Convey(`With a task queue configured`, func() { 109 Convey(`With a task queue configured`, func() {
115 tq.Get(c).Testable().CreateQueue(qName) 110 tq.Get(c).Testable().CreateQueue(qName)
116 111
117 » » » » Convey(`A terminal endpoint hit will be successf ul and idempotent.`, func() { 112 » » » » streams := func() []string {
118 » » » » » resp, err := http.Get(fmt.Sprintf("%s/ar chive/cron/terminal", s.URL)) 113 » » » » » s, err := ct.GetArchiveTaskStreams(tq.Ge t(c), qName)
114 » » » » » if err != nil {
115 » » » » » » panic(err)
116 » » » » » }
117 » » » » » return s
118 » » » » }
119
120 » » » » Convey(`A request to the endpoint will be succes sful.`, func() {
121 » » » » » resp, err := http.Get(endpoint)
119 So(err, ShouldBeNil) 122 So(err, ShouldBeNil)
120 So(resp.StatusCode, ShouldEqual, http.St atusOK) 123 So(resp.StatusCode, ShouldEqual, http.St atusOK)
121 124
122 // Candidate tasks should be scheduled. 125 // Candidate tasks should be scheduled.
123 » » » » » tasks := tq.Get(c).Testable().GetSchedul edTasks()[qName] 126 » » » » » So(streams(), ShouldResemble, []string{" bar", "quux"})
124 » » » » » So(tasks, shouldHaveTasks, archiveTaskNa me("testing/+/bar"))
125 127
126 » » » » » // Hit the endpoint again, the same task s should be scheduled. 128 » » » » » // Hit the endpoint again, no additional tasks should be scheduled.
127 » » » » » resp, err = http.Get(fmt.Sprintf("%s/arc hive/cron/terminal", s.URL)) 129 » » » » » Convey(`A subsequent endpoint hit will n ot schedule any additional tasks.`, func() {
128 » » » » » So(err, ShouldBeNil) 130 » » » » » » resp, err = http.Get(endpoint)
129 » » » » » So(resp.StatusCode, ShouldEqual, http.St atusOK) 131 » » » » » » So(err, ShouldBeNil)
130 132 » » » » » » So(resp.StatusCode, ShouldEqual, http.StatusOK)
131 » » » » » tasks2 := tq.Get(c).Testable().GetSchedu ledTasks()[qName] 133 » » » » » » So(streams(), ShouldResemble, [] string{"bar", "quux"})
132 » » » » » So(tasks2, ShouldResemble, tasks) 134 » » » » » })
133 » » » » })
134
135 » » » » Convey(`A non-terminal endpoint hit will be succ essful and idempotent.`, func() {
136 » » » » » resp, err := http.Get(fmt.Sprintf("%s/ar chive/cron/nonterminal", s.URL))
137 » » » » » So(err, ShouldBeNil)
138 » » » » » So(resp.StatusCode, ShouldEqual, http.St atusOK)
139
140 » » » » » // Candidate tasks should be scheduled.
141 » » » » » tasks := tq.Get(c).Testable().GetSchedul edTasks()[qName]
142 » » » » » So(tasks, shouldHaveTasks, archiveTaskNa me("testing/+/qux"))
143
144 » » » » » // Hit the endpoint again, the same task s should be scheduled.
145 » » » » » resp, err = http.Get(fmt.Sprintf("%s/arc hive/cron/nonterminal", s.URL))
146 » » » » » So(err, ShouldBeNil)
147 » » » » » So(resp.StatusCode, ShouldEqual, http.St atusOK)
148
149 » » » » » tasks2 := tq.Get(c).Testable().GetSchedu ledTasks()[qName]
150 » » » » » So(tasks2, ShouldResemble, tasks)
151 » » » » })
152
153 » » » » Convey(`A terminal endpoint hit followed by a no n-terminal endpoint hit will be successful.`, func() {
154 » » » » » resp, err := http.Get(fmt.Sprintf("%s/ar chive/cron/terminal", s.URL))
155 » » » » » So(err, ShouldBeNil)
156 » » » » » So(resp.StatusCode, ShouldEqual, http.St atusOK)
157 » » » » » So(tq.Get(c).Testable().GetScheduledTask s()[qName], shouldHaveTasks, archiveTaskName("testing/+/bar"))
158
159 » » » » » resp, err = http.Get(fmt.Sprintf("%s/arc hive/cron/nonterminal", s.URL))
160 » » » » » So(err, ShouldBeNil)
161 » » » » » So(resp.StatusCode, ShouldEqual, http.St atusOK)
162 » » » » » So(tq.Get(c).Testable().GetScheduledTask s()[qName], shouldHaveTasks,
163 » » » » » » archiveTaskName("testing/+/bar") , archiveTaskName("testing/+/qux"))
164 }) 135 })
165 136
166 Convey(`When scheduling multiple tasks`, func() { 137 Convey(`When scheduling multiple tasks`, func() {
138 b.multiTaskBatchSize = 5
139
167 // Create a lot of archival candidate ta sks to schedule. 140 // Create a lot of archival candidate ta sks to schedule.
168 » » » » » var names []string 141 » » » » » //
142 » » » » » // Note that since task queue names are returned sorted, we want to
143 » » » » » // name these streams greater than our s tock stream names.
144 » » » » » names := []string{"bar", "quux"}
169 for i := 0; i < 11; i++ { 145 for i := 0; i < 11; i++ {
170 » » » » » » ls := ct.TestLogStream(c, ct.Tes tLogStreamDescriptor(c, fmt.Sprintf("stream-%d", i))) 146 » » » » » » ls := ct.TestLogStream(c, ct.Tes tLogStreamDescriptor(c, fmt.Sprintf("stream-%02d", i)))
171 147
172 » » » » » » ls.Created = clock.Now(c).Add(-( 10 * time.Minute)) 148 » » » » » » ls.Created = now.Add(-(10 * time .Minute))
173 » » » » » » ls.Updated = ls.Created 149 » » » » » » ls.State = coordinator.LSStreami ng
174 » » » » » » ls.State = coordinator.LSTermina ted
175 ls.TerminalIndex = 1337 150 ls.TerminalIndex = 1337
176 » » » » » » if err := ls.Put(ds.Get(c)); err != nil { 151 » » » » » » ls.TerminatedTime = now
152 » » » » » » if err := ds.Get(c).Put(ls); err != nil {
177 panic(err) 153 panic(err)
178 } 154 }
179 155
180 names = append(names, ls.Name) 156 names = append(names, ls.Name)
181 } 157 }
182 names = append(names, "bar")
183 ds.Get(c).Testable().CatchupIndexes() 158 ds.Get(c).Testable().CatchupIndexes()
184 159
185 Convey(`Will schedule all pages properly .`, func() { 160 Convey(`Will schedule all pages properly .`, func() {
186 » » » » » » taskNames := make([]interface{}, len(names)) 161 » » » » » » // Ensure that all of these task s get added to the task queue.
187 » » » » » » for i, n := range names { 162 » » » » » » resp, err := http.Get(endpoint)
188 » » » » » » » taskNames[i] = archiveTa skName(fmt.Sprintf("testing/+/%s", n)) 163 » » » » » » So(err, ShouldBeNil)
164 » » » » » » So(resp.StatusCode, ShouldEqual, http.StatusOK)
165 » » » » » » So(streams(), ShouldResemble, na mes)
166
167 » » » » » » // Verify that all tasks are con figured to execute immediately.
168 » » » » » » for _, t := range tq.Get(c).Test able().GetScheduledTasks()[qName] {
169 » » » » » » » So(t.ETA, ShouldResemble , now)
189 } 170 }
190 171
191 » » » » » » // Ensure that all of these task s get added to the task queue. 172 » » » » » » Convey(`Will not schedule additi onal tasks on the next run.`, func() {
192 » » » » » » resp, err := http.Get(fmt.Sprint f("%s/archive/cron/terminal", s.URL)) 173 » » » » » » » resp, err := http.Get(en dpoint)
193 » » » » » » So(err, ShouldBeNil)
194 » » » » » » So(resp.StatusCode, ShouldEqual, http.StatusOK)
195 » » » » » » So(tq.Get(c).Testable().GetSched uledTasks()[qName], shouldHaveTasks, taskNames...)
196
197 » » » » » » Convey(`Will be successful when rescheduling the same tasks.`, func() {
198 » » » » » » » resp, err := http.Get(fm t.Sprintf("%s/archive/cron/terminal", s.URL))
199 So(err, ShouldBeNil) 174 So(err, ShouldBeNil)
200 So(resp.StatusCode, Shou ldEqual, http.StatusOK) 175 So(resp.StatusCode, Shou ldEqual, http.StatusOK)
201 » » » » » » » So(tq.Get(c).Testable(). GetScheduledTasks()[qName], shouldHaveTasks, taskNames...) 176 » » » » » » » So(streams(), ShouldRese mble, names)
202 }) 177 })
203 }) 178 })
204 179
205 » » » » » Convey(`Will return an error if task sch eduling fails.`, func() { 180 » » » » » Convey(`Will not schedule tasks if task scheduling fails.`, func() {
206 c, fb := featureBreaker.FilterTQ (c, nil) 181 c, fb := featureBreaker.FilterTQ (c, nil)
207 tb.Context = c 182 tb.Context = c
208 fb.BreakFeatures(errors.New("tes t error"), "AddMulti") 183 fb.BreakFeatures(errors.New("tes t error"), "AddMulti")
209 184
210 // Ensure that all of these task s get added to the task queue. 185 // Ensure that all of these task s get added to the task queue.
211 » » » » » » resp, err := http.Get(fmt.Sprint f("%s/archive/cron/terminal", s.URL)) 186 » » » » » » resp, err := http.Get(endpoint)
212 So(err, ShouldBeNil) 187 So(err, ShouldBeNil)
213 So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError) 188 So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
214 » » » » » }) 189 » » » » » » So(streams(), ShouldResemble, [] string{})
215 190
216 » » » » » Convey(`Will return an error if a single task scheduling fails.`, func() { 191 » » » » » » Convey(`And will schedule stream s next run.`, func() {
217 » » » » » » merr := make(errors.MultiError, len(names)) 192 » » » » » » » fb.UnbreakFeatures("AddM ulti")
218 » » » » » » merr[0] = errors.New("test error ")
219 193
220 » » » » » » c, fb := featureBreaker.FilterTQ (c, nil) 194 » » » » » » » resp, err := http.Get(en dpoint)
221 » » » » » » tb.Context = c 195 » » » » » » » So(err, ShouldBeNil)
222 » » » » » » fb.BreakFeatures(merr, "AddMulti ") 196 » » » » » » » So(resp.StatusCode, Shou ldEqual, http.StatusOK)
223 197 » » » » » » » So(streams(), ShouldRese mble, names)
224 » » » » » » // Ensure that all of these task s get added to the task queue. 198 » » » » » » })
225 » » » » » » resp, err := http.Get(fmt.Sprint f("%s/archive/cron/terminal", s.URL))
226 » » » » » » So(err, ShouldBeNil)
227 » » » » » » So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
228 » » » » » })
229
230 » » » » » Convey(`And a purge endpoint hit will pu rge the tasks.`, func() {
231 » » » » » » resp, err := http.Get(fmt.Sprint f("%s/archive/cron/purge", s.URL))
232 » » » » » » So(err, ShouldBeNil)
233 » » » » » » So(resp.StatusCode, ShouldEqual, http.StatusOK)
234 » » » » » » So(tq.Get(c).Testable().GetSched uledTasks()[qName], shouldHaveTasks)
235 }) 199 })
236 }) 200 })
237 }) 201 })
238 }) 202 })
239 }) 203 })
240 } 204 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698