Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(51)

Side by Side Diff: appengine/logdog/coordinator/endpoints/services/archiveStream_test.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Minor fixes, works in dev now. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package services 5 package services
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "testing" 9 "testing"
10 10
11 "github.com/luci/gae/filter/featureBreaker" 11 "github.com/luci/gae/filter/featureBreaker"
12 "github.com/luci/gae/impl/memory" 12 "github.com/luci/gae/impl/memory"
13 ds "github.com/luci/gae/service/datastore" 13 ds "github.com/luci/gae/service/datastore"
14 "github.com/luci/luci-go/appengine/logdog/coordinator" 14 "github.com/luci/luci-go/appengine/logdog/coordinator"
15 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest " 15 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest "
16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
17 "github.com/luci/luci-go/common/clock/testclock"
17 "github.com/luci/luci-go/common/proto/logdog/svcconfig" 18 "github.com/luci/luci-go/common/proto/logdog/svcconfig"
18 "github.com/luci/luci-go/server/auth" 19 "github.com/luci/luci-go/server/auth"
19 "github.com/luci/luci-go/server/auth/authtest" 20 "github.com/luci/luci-go/server/auth/authtest"
20 "golang.org/x/net/context" 21 "golang.org/x/net/context"
21 22
22 . "github.com/luci/luci-go/common/testing/assertions" 23 . "github.com/luci/luci-go/common/testing/assertions"
23 . "github.com/smartystreets/goconvey/convey" 24 . "github.com/smartystreets/goconvey/convey"
24 ) 25 )
25 26
26 func TestArchiveStream(t *testing.T) { 27 func TestArchiveStream(t *testing.T) {
27 t.Parallel() 28 t.Parallel()
28 29
29 Convey(`With a testing configuration`, t, func() { 30 Convey(`With a testing configuration`, t, func() {
30 » » c := memory.Use(context.Background()) 31 » » c, tc := testclock.UseTime(context.Background(), testclock.TestT imeUTC)
32 » » c = memory.Use(c)
31 be := Server{} 33 be := Server{}
32 34
33 » » c = ct.UseConfig(c, &svcconfig.Coordinator{ 35 » » now := ds.RoundTime(tc.Now().UTC())
36 » » ccfg := svcconfig.Coordinator{
34 ServiceAuthGroup: "test-services", 37 ServiceAuthGroup: "test-services",
35 » » }) 38 » » » ArchiveRetries: 3,
39 » » }
40 » » c = ct.UseConfig(c, &ccfg)
41
36 fs := authtest.FakeState{} 42 fs := authtest.FakeState{}
37 c = auth.WithState(c, &fs) 43 c = auth.WithState(c, &fs)
38 44
39 » » // Register a testing log stream (not archived). 45 » » // Register a testing log stream with an archive tasked.
40 ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, "foo")) 46 ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, "foo"))
41 » » if err := ls.Put(ds.Get(c)); err != nil { 47 » » ls.State = coordinator.LSArchiveTasked
48 » » if err := ds.Get(c).Put(ls); err != nil {
42 panic(err) 49 panic(err)
43 } 50 }
44 51
45 req := &logdog.ArchiveStreamRequest{ 52 req := &logdog.ArchiveStreamRequest{
46 Path: string(ls.Path()), 53 Path: string(ls.Path()),
47 Complete: true,
48 TerminalIndex: 13, 54 TerminalIndex: 13,
55 LogEntryCount: 14,
49 StreamUrl: "gs://fake.stream", 56 StreamUrl: "gs://fake.stream",
50 StreamSize: 10, 57 StreamSize: 10,
51 IndexUrl: "gs://fake.index", 58 IndexUrl: "gs://fake.index",
52 IndexSize: 20, 59 IndexSize: 20,
53 DataUrl: "gs://fake.data", 60 DataUrl: "gs://fake.data",
54 DataSize: 30, 61 DataSize: 30,
55 } 62 }
56 63
57 Convey(`Returns Forbidden error if not a service.`, func() { 64 Convey(`Returns Forbidden error if not a service.`, func() {
58 _, err := be.ArchiveStream(c, req) 65 _, err := be.ArchiveStream(c, req)
59 So(err, ShouldBeRPCPermissionDenied) 66 So(err, ShouldBeRPCPermissionDenied)
60 }) 67 })
61 68
62 Convey(`When logged in as a service`, func() { 69 Convey(`When logged in as a service`, func() {
63 fs.IdentityGroups = []string{"test-services"} 70 fs.IdentityGroups = []string{"test-services"}
64 71
65 Convey(`Will mark the stream as archived.`, func() { 72 Convey(`Will mark the stream as archived.`, func() {
66 _, err := be.ArchiveStream(c, req) 73 _, err := be.ArchiveStream(c, req)
67 So(err, ShouldBeNil) 74 So(err, ShouldBeNil)
68 75
69 So(ds.Get(c).Get(ls), ShouldBeNil) 76 So(ds.Get(c).Get(ls), ShouldBeNil)
77 So(ls.Terminated(), ShouldBeTrue)
70 So(ls.Archived(), ShouldBeTrue) 78 So(ls.Archived(), ShouldBeTrue)
71 » » » » So(ls.ArchiveWhole, ShouldBeTrue) 79 » » » » So(ls.ArchiveComplete(), ShouldBeTrue)
80
81 » » » » So(ls.State, ShouldEqual, coordinator.LSArchived )
82 » » » » So(ls.TerminatedTime, ShouldResemble, now)
83 » » » » So(ls.ArchivedTime, ShouldResemble, now)
72 So(ls.TerminalIndex, ShouldEqual, 13) 84 So(ls.TerminalIndex, ShouldEqual, 13)
85 So(ls.ArchiveLogEntryCount, ShouldEqual, 14)
73 So(ls.ArchiveStreamURL, ShouldEqual, "gs://fake. stream") 86 So(ls.ArchiveStreamURL, ShouldEqual, "gs://fake. stream")
74 So(ls.ArchiveStreamSize, ShouldEqual, 10) 87 So(ls.ArchiveStreamSize, ShouldEqual, 10)
75 So(ls.ArchiveIndexURL, ShouldEqual, "gs://fake.i ndex") 88 So(ls.ArchiveIndexURL, ShouldEqual, "gs://fake.i ndex")
89 So(ls.ArchiveIndexSize, ShouldEqual, 20)
90 So(ls.ArchiveDataURL, ShouldEqual, "gs://fake.da ta")
91 So(ls.ArchiveDataSize, ShouldEqual, 30)
92 })
93
94 Convey(`Will mark the stream as partially archived if no t complete.`, func() {
95 req.LogEntryCount = 13
96
97 _, err := be.ArchiveStream(c, req)
98 So(err, ShouldBeNil)
99
100 So(ds.Get(c).Get(ls), ShouldBeNil)
101 So(ls.Terminated(), ShouldBeTrue)
102 So(ls.Archived(), ShouldBeTrue)
103 So(ls.ArchiveComplete(), ShouldBeFalse)
104
105 So(ls.State, ShouldEqual, coordinator.LSArchived )
106 So(ls.TerminatedTime, ShouldResemble, now)
107 So(ls.ArchivedTime, ShouldResemble, now)
108 So(ls.TerminalIndex, ShouldEqual, 13)
109 So(ls.ArchiveLogEntryCount, ShouldEqual, 13)
110 So(ls.ArchiveStreamURL, ShouldEqual, "gs://fake. stream")
111 So(ls.ArchiveStreamSize, ShouldEqual, 10)
112 So(ls.ArchiveIndexURL, ShouldEqual, "gs://fake.i ndex")
76 So(ls.ArchiveIndexSize, ShouldEqual, 20) 113 So(ls.ArchiveIndexSize, ShouldEqual, 20)
77 So(ls.ArchiveDataURL, ShouldEqual, "gs://fake.da ta") 114 So(ls.ArchiveDataURL, ShouldEqual, "gs://fake.da ta")
78 So(ls.ArchiveDataSize, ShouldEqual, 30) 115 So(ls.ArchiveDataSize, ShouldEqual, 30)
79 }) 116 })
80 117
81 Convey(`Will refuse to process an invalid stream path.`, func() { 118 Convey(`Will refuse to process an invalid stream path.`, func() {
82 req.Path = "!!!invalid!!!" 119 req.Path = "!!!invalid!!!"
83 _, err := be.ArchiveStream(c, req) 120 _, err := be.ArchiveStream(c, req)
84 So(err, ShouldBeRPCInvalidArgument, "invalid log stream path") 121 So(err, ShouldBeRPCInvalidArgument, "invalid log stream path")
85 }) 122 })
86 123
87 Convey(`If index URL is missing, will refuse to mark the stream archived.`, func() { 124 Convey(`If index URL is missing, will refuse to mark the stream archived.`, func() {
88 req.IndexUrl = "" 125 req.IndexUrl = ""
89 126
90 _, err := be.ArchiveStream(c, req) 127 _, err := be.ArchiveStream(c, req)
91 So(err, ShouldBeRPCInvalidArgument) 128 So(err, ShouldBeRPCInvalidArgument)
92 }) 129 })
93 130
94 Convey(`If stream URL is missing, will refuse to mark th e stream archived.`, func() { 131 Convey(`If stream URL is missing, will refuse to mark th e stream archived.`, func() {
95 req.StreamUrl = "" 132 req.StreamUrl = ""
96 133
97 _, err := be.ArchiveStream(c, req) 134 _, err := be.ArchiveStream(c, req)
98 So(err, ShouldBeRPCInvalidArgument) 135 So(err, ShouldBeRPCInvalidArgument)
99 }) 136 })
100 137
101 Convey(`If stream is already archived, will not update a nd return success.`, func() { 138 Convey(`If stream is already archived, will not update a nd return success.`, func() {
102 ls.State = coordinator.LSArchived 139 ls.State = coordinator.LSArchived
103 ls.TerminalIndex = 1337 140 ls.TerminalIndex = 1337
141 ls.ArchiveLogEntryCount = 42
142 ls.ArchivedTime = now
143 ls.TerminatedTime = now
144 So(ds.Get(c).Put(ls), ShouldBeNil)
145 So(ls.Terminated(), ShouldBeTrue)
104 So(ls.Archived(), ShouldBeTrue) 146 So(ls.Archived(), ShouldBeTrue)
105 So(ls.Put(ds.Get(c)), ShouldBeNil)
106 147
107 _, err := be.ArchiveStream(c, req) 148 _, err := be.ArchiveStream(c, req)
108 So(err, ShouldBeNil) 149 So(err, ShouldBeNil)
109 150
110 ls.TerminalIndex = -1 // To make sure it reloade d. 151 ls.TerminalIndex = -1 // To make sure it reloade d.
111 So(ds.Get(c).Get(ls), ShouldBeNil) 152 So(ds.Get(c).Get(ls), ShouldBeNil)
153 So(ls.Terminated(), ShouldBeTrue)
112 So(ls.Archived(), ShouldBeTrue) 154 So(ls.Archived(), ShouldBeTrue)
155
156 So(ls.State, ShouldEqual, coordinator.LSArchived )
113 So(ls.TerminalIndex, ShouldEqual, 1337) 157 So(ls.TerminalIndex, ShouldEqual, 1337)
158 So(ls.ArchiveLogEntryCount, ShouldEqual, 42)
159 })
160
161 Convey(`If the archive has failed`, func() {
162 req.Error = true
163
164 Convey(`If the stream is below error threshold, will increment error count and return Aborted.`, func() {
165 ls.ArchiveErrors = 0
166 So(ds.Get(c).Put(ls), ShouldBeNil)
167
168 _, err := be.ArchiveStream(c, req)
169 So(err, ShouldBeRPCAborted)
170
171 So(ds.Get(c).Get(ls), ShouldBeNil)
172 So(ls.Archived(), ShouldBeFalse)
173
174 So(ls.State, ShouldEqual, coordinator.LS ArchiveTasked)
175 So(ls.ArchiveErrors, ShouldEqual, 1)
176 })
177
178 Convey(`If the stream is above error threshold, will succeed with empty archival.`, func() {
179 ls.ArchiveErrors = int(ccfg.ArchiveRetri es)
180 So(ds.Get(c).Put(ls), ShouldBeNil)
181
182 _, err := be.ArchiveStream(c, req)
183 So(err, ShouldBeNil)
184 So(ds.Get(c).Get(ls), ShouldBeNil)
185 So(ls.Archived(), ShouldBeTrue)
186
187 So(ls.State, ShouldEqual, coordinator.LS Archived)
188 So(ls.ArchiveErrors, ShouldEqual, ccfg.A rchiveRetries+1)
189 So(ls.TerminalIndex, ShouldEqual, -1)
190 So(ls.ArchiveLogEntryCount, ShouldEqual, 0)
191 })
114 }) 192 })
115 193
116 Convey(`When datastore Get fails, returns internal error .`, func() { 194 Convey(`When datastore Get fails, returns internal error .`, func() {
117 c, fb := featureBreaker.FilterRDS(c, nil) 195 c, fb := featureBreaker.FilterRDS(c, nil)
118 fb.BreakFeatures(errors.New("test error"), "GetM ulti") 196 fb.BreakFeatures(errors.New("test error"), "GetM ulti")
119 197
120 _, err := be.ArchiveStream(c, req) 198 _, err := be.ArchiveStream(c, req)
121 So(err, ShouldBeRPCInternal) 199 So(err, ShouldBeRPCInternal)
122 }) 200 })
123 201
124 Convey(`When datastore Put fails, returns internal error .`, func() { 202 Convey(`When datastore Put fails, returns internal error .`, func() {
125 c, fb := featureBreaker.FilterRDS(c, nil) 203 c, fb := featureBreaker.FilterRDS(c, nil)
126 fb.BreakFeatures(errors.New("test error"), "PutM ulti") 204 fb.BreakFeatures(errors.New("test error"), "PutM ulti")
127 205
128 _, err := be.ArchiveStream(c, req) 206 _, err := be.ArchiveStream(c, req)
129 So(err, ShouldBeRPCInternal) 207 So(err, ShouldBeRPCInternal)
130 }) 208 })
131 }) 209 })
132 }) 210 })
133 } 211 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698