Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(586)

Side by Side Diff: appengine/logdog/coordinator/endpoints/services/archiveStream_test.go

Issue 1853433002: LogDog: Handle archive failures. (Closed) Base URL: https://github.com/luci/luci-go@logdog-gs-update
Patch Set: Regenerate protobufs. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package services 5 package services
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "testing" 9 "testing"
10 10
11 "github.com/luci/gae/filter/featureBreaker" 11 "github.com/luci/gae/filter/featureBreaker"
12 "github.com/luci/gae/impl/memory" 12 "github.com/luci/gae/impl/memory"
13 ds "github.com/luci/gae/service/datastore" 13 ds "github.com/luci/gae/service/datastore"
14 "github.com/luci/luci-go/appengine/logdog/coordinator" 14 "github.com/luci/luci-go/appengine/logdog/coordinator"
15 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest " 15 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest "
16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 16 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
17 "github.com/luci/luci-go/common/logging/gologger"
17 "github.com/luci/luci-go/common/proto/logdog/svcconfig" 18 "github.com/luci/luci-go/common/proto/logdog/svcconfig"
18 "github.com/luci/luci-go/server/auth" 19 "github.com/luci/luci-go/server/auth"
19 "github.com/luci/luci-go/server/auth/authtest" 20 "github.com/luci/luci-go/server/auth/authtest"
20 "golang.org/x/net/context" 21 "golang.org/x/net/context"
22 "google.golang.org/grpc/codes"
21 23
22 . "github.com/luci/luci-go/common/testing/assertions" 24 . "github.com/luci/luci-go/common/testing/assertions"
23 . "github.com/smartystreets/goconvey/convey" 25 . "github.com/smartystreets/goconvey/convey"
24 ) 26 )
25 27
26 func TestArchiveStream(t *testing.T) { 28 func TestArchiveStream(t *testing.T) {
27 t.Parallel() 29 t.Parallel()
28 30
29 Convey(`With a testing configuration`, t, func() { 31 Convey(`With a testing configuration`, t, func() {
30 c := memory.Use(context.Background()) 32 c := memory.Use(context.Background())
33 c = gologger.Use(c)
31 be := Server{} 34 be := Server{}
32 35
33 c = ct.UseConfig(c, &svcconfig.Coordinator{ 36 c = ct.UseConfig(c, &svcconfig.Coordinator{
34 ServiceAuthGroup: "test-services", 37 ServiceAuthGroup: "test-services",
35 }) 38 })
36 fs := authtest.FakeState{} 39 fs := authtest.FakeState{}
37 c = auth.WithState(c, &fs) 40 c = auth.WithState(c, &fs)
38 41
39 // Register a testing log stream (not archived). 42 // Register a testing log stream (not archived).
40 ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, "foo")) 43 ls := ct.TestLogStream(c, ct.TestLogStreamDescriptor(c, "foo"))
(...skipping 20 matching lines...) Expand all
61 64
62 Convey(`When logged in as a service`, func() { 65 Convey(`When logged in as a service`, func() {
63 fs.IdentityGroups = []string{"test-services"} 66 fs.IdentityGroups = []string{"test-services"}
64 67
65 Convey(`Will mark the stream as archived.`, func() { 68 Convey(`Will mark the stream as archived.`, func() {
66 _, err := be.ArchiveStream(c, req) 69 _, err := be.ArchiveStream(c, req)
67 So(err, ShouldBeNil) 70 So(err, ShouldBeNil)
68 71
69 So(ds.Get(c).Get(ls), ShouldBeNil) 72 So(ds.Get(c).Get(ls), ShouldBeNil)
70 So(ls.Archived(), ShouldBeTrue) 73 So(ls.Archived(), ShouldBeTrue)
74 So(ls.ArchiveState, ShouldEqual, coordinator.Arc hived)
71 So(ls.ArchiveWhole, ShouldBeTrue) 75 So(ls.ArchiveWhole, ShouldBeTrue)
72 So(ls.TerminalIndex, ShouldEqual, 13) 76 So(ls.TerminalIndex, ShouldEqual, 13)
73 So(ls.ArchiveStreamURL, ShouldEqual, "gs://fake. stream") 77 So(ls.ArchiveStreamURL, ShouldEqual, "gs://fake. stream")
74 So(ls.ArchiveStreamSize, ShouldEqual, 10) 78 So(ls.ArchiveStreamSize, ShouldEqual, 10)
75 So(ls.ArchiveIndexURL, ShouldEqual, "gs://fake.i ndex") 79 So(ls.ArchiveIndexURL, ShouldEqual, "gs://fake.i ndex")
76 So(ls.ArchiveIndexSize, ShouldEqual, 20) 80 So(ls.ArchiveIndexSize, ShouldEqual, 20)
77 So(ls.ArchiveDataURL, ShouldEqual, "gs://fake.da ta") 81 So(ls.ArchiveDataURL, ShouldEqual, "gs://fake.da ta")
78 So(ls.ArchiveDataSize, ShouldEqual, 30) 82 So(ls.ArchiveDataSize, ShouldEqual, 30)
79 }) 83 })
80 84
85 Convey(`Will mark the stream as partially archived if no t complete.`, func() {
86 req.Complete = false
87
88 _, err := be.ArchiveStream(c, req)
89 So(err, ShouldBeNil)
90
91 So(ds.Get(c).Get(ls), ShouldBeNil)
92 So(ls.Archived(), ShouldBeTrue)
93 So(ls.ArchiveState, ShouldEqual, coordinator.Arc hivedPartially)
94 So(ls.ArchiveWhole, ShouldBeFalse)
95 So(ls.TerminalIndex, ShouldEqual, 13)
96 So(ls.ArchiveStreamURL, ShouldEqual, "gs://fake. stream")
97 So(ls.ArchiveStreamSize, ShouldEqual, 10)
98 So(ls.ArchiveIndexURL, ShouldEqual, "gs://fake.i ndex")
99 So(ls.ArchiveIndexSize, ShouldEqual, 20)
100 So(ls.ArchiveDataURL, ShouldEqual, "gs://fake.da ta")
101 So(ls.ArchiveDataSize, ShouldEqual, 30)
102 })
103
81 Convey(`Will refuse to process an invalid stream path.`, func() { 104 Convey(`Will refuse to process an invalid stream path.`, func() {
82 req.Path = "!!!invalid!!!" 105 req.Path = "!!!invalid!!!"
83 _, err := be.ArchiveStream(c, req) 106 _, err := be.ArchiveStream(c, req)
84 So(err, ShouldBeRPCInvalidArgument, "invalid log stream path") 107 So(err, ShouldBeRPCInvalidArgument, "invalid log stream path")
85 }) 108 })
86 109
87 Convey(`If index URL is missing, will refuse to mark the stream archived.`, func() { 110 Convey(`If index URL is missing, will refuse to mark the stream archived.`, func() {
88 req.IndexUrl = "" 111 req.IndexUrl = ""
89 112
90 _, err := be.ArchiveStream(c, req) 113 _, err := be.ArchiveStream(c, req)
91 So(err, ShouldBeRPCInvalidArgument) 114 So(err, ShouldBeRPCInvalidArgument)
92 }) 115 })
93 116
94 Convey(`If stream URL is missing, will refuse to mark th e stream archived.`, func() { 117 Convey(`If stream URL is missing, will refuse to mark th e stream archived.`, func() {
95 req.StreamUrl = "" 118 req.StreamUrl = ""
96 119
97 _, err := be.ArchiveStream(c, req) 120 _, err := be.ArchiveStream(c, req)
98 So(err, ShouldBeRPCInvalidArgument) 121 So(err, ShouldBeRPCInvalidArgument)
99 }) 122 })
100 123
101 Convey(`If stream is already archived, will not update a nd return success.`, func() { 124 Convey(`If stream is already archived, will not update a nd return success.`, func() {
102 ls.State = coordinator.LSArchived 125 ls.State = coordinator.LSArchived
126 ls.ArchiveState = coordinator.Archived
103 ls.TerminalIndex = 1337 127 ls.TerminalIndex = 1337
104 So(ls.Archived(), ShouldBeTrue) 128 So(ls.Archived(), ShouldBeTrue)
105 So(ls.Put(ds.Get(c)), ShouldBeNil) 129 So(ls.Put(ds.Get(c)), ShouldBeNil)
106 130
107 _, err := be.ArchiveStream(c, req) 131 _, err := be.ArchiveStream(c, req)
108 So(err, ShouldBeNil) 132 So(err, ShouldBeNil)
109 133
110 ls.TerminalIndex = -1 // To make sure it reloade d. 134 ls.TerminalIndex = -1 // To make sure it reloade d.
111 So(ds.Get(c).Get(ls), ShouldBeNil) 135 So(ds.Get(c).Get(ls), ShouldBeNil)
112 So(ls.Archived(), ShouldBeTrue) 136 So(ls.Archived(), ShouldBeTrue)
137 So(ls.ArchiveState, ShouldEqual, coordinator.Arc hived)
113 So(ls.TerminalIndex, ShouldEqual, 1337) 138 So(ls.TerminalIndex, ShouldEqual, 1337)
114 }) 139 })
115 140
141 Convey(`If the archive has failed`, func() {
142 req.Error = true
143
144 Convey(`If the stream is below error threshold, will increment error count and return FailedPrecondition.`, func() {
145 ls.ArchiveErrors = 0
146 So(ls.ArchiveErrors, ShouldBeLessThan, m axArchiveErrors)
147 So(ls.Put(ds.Get(c)), ShouldBeNil)
148
149 _, err := be.ArchiveStream(c, req)
150 So(err, ShouldHaveRPCCode, codes.FailedP recondition)
151
152 So(ds.Get(c).Get(ls), ShouldBeNil)
153 So(ls.Archived(), ShouldBeFalse)
154 So(ls.ArchiveState, ShouldEqual, coordin ator.NotArchived)
155 So(ls.ArchiveErrors, ShouldEqual, 1)
156 })
157
158 Convey(`If the stream is above error threshold, will succeed.`, func() {
159 ls.ArchiveErrors = maxArchiveErrors
160 So(ls.Put(ds.Get(c)), ShouldBeNil)
161
162 _, err := be.ArchiveStream(c, req)
163 So(err, ShouldBeNil)
164 So(ds.Get(c).Get(ls), ShouldBeNil)
165 So(ls.Archived(), ShouldBeTrue)
166 So(ls.ArchiveState, ShouldEqual, coordin ator.ArchivedWithErrors)
167 So(ls.TerminalIndex, ShouldEqual, 13)
168 })
169 })
170
116 Convey(`When datastore Get fails, returns internal error .`, func() { 171 Convey(`When datastore Get fails, returns internal error .`, func() {
117 c, fb := featureBreaker.FilterRDS(c, nil) 172 c, fb := featureBreaker.FilterRDS(c, nil)
118 fb.BreakFeatures(errors.New("test error"), "GetM ulti") 173 fb.BreakFeatures(errors.New("test error"), "GetM ulti")
119 174
120 _, err := be.ArchiveStream(c, req) 175 _, err := be.ArchiveStream(c, req)
121 So(err, ShouldBeRPCInternal) 176 So(err, ShouldBeRPCInternal)
122 }) 177 })
123 178
124 Convey(`When datastore Put fails, returns internal error .`, func() { 179 Convey(`When datastore Put fails, returns internal error .`, func() {
125 c, fb := featureBreaker.FilterRDS(c, nil) 180 c, fb := featureBreaker.FilterRDS(c, nil)
126 fb.BreakFeatures(errors.New("test error"), "PutM ulti") 181 fb.BreakFeatures(errors.New("test error"), "PutM ulti")
127 182
128 _, err := be.ArchiveStream(c, req) 183 _, err := be.ArchiveStream(c, req)
129 So(err, ShouldBeRPCInternal) 184 So(err, ShouldBeRPCInternal)
130 }) 185 })
131 }) 186 })
132 }) 187 })
133 } 188 }
OLDNEW
« no previous file with comments | « appengine/logdog/coordinator/endpoints/services/archiveStream.go ('k') | appengine/logdog/coordinator/logStream.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698