| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package services | 5 package services |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "testing" | 10 "testing" |
| 11 "time" |
| 11 | 12 |
| 12 "github.com/luci/gae/filter/featureBreaker" | 13 "github.com/luci/gae/filter/featureBreaker" |
| 13 ds "github.com/luci/gae/service/datastore" | 14 ds "github.com/luci/gae/service/datastore" |
| 14 "github.com/luci/luci-go/appengine/logdog/coordinator" | 15 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 15 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" | 16 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" |
| 16 "github.com/luci/luci-go/appengine/logdog/coordinator/hierarchy" | 17 "github.com/luci/luci-go/appengine/logdog/coordinator/hierarchy" |
| 17 "github.com/luci/luci-go/appengine/tumble" | 18 "github.com/luci/luci-go/appengine/tumble" |
| 18 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 19 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 20 "github.com/luci/luci-go/common/clock" |
| 21 "github.com/luci/luci-go/common/clock/testclock" |
| 19 "github.com/luci/luci-go/common/logdog/types" | 22 "github.com/luci/luci-go/common/logdog/types" |
| 23 "github.com/luci/luci-go/common/proto/google" |
| 20 "github.com/luci/luci-go/common/proto/logdog/logpb" | 24 "github.com/luci/luci-go/common/proto/logdog/logpb" |
| 21 "github.com/luci/luci-go/server/auth" | 25 "github.com/luci/luci-go/server/auth" |
| 22 "github.com/luci/luci-go/server/auth/authtest" | 26 "github.com/luci/luci-go/server/auth/authtest" |
| 23 | 27 |
| 24 . "github.com/luci/luci-go/common/testing/assertions" | 28 . "github.com/luci/luci-go/common/testing/assertions" |
| 25 . "github.com/smartystreets/goconvey/convey" | 29 . "github.com/smartystreets/goconvey/convey" |
| 26 ) | 30 ) |
| 27 | 31 |
| 28 func TestRegisterStream(t *testing.T) { | 32 func TestRegisterStream(t *testing.T) { |
| 29 t.Parallel() | 33 t.Parallel() |
| 30 | 34 |
| 31 Convey(`With a testing configuration`, t, func() { | 35 Convey(`With a testing configuration`, t, func() { |
| 32 tt := &tumble.Testing{} | 36 tt := &tumble.Testing{} |
| 33 c := tt.Context() | 37 c := tt.Context() |
| 38 tc := clock.Get(c).(testclock.TestClock) |
| 39 |
| 40 // Add Tumble delayed mutation index. |
| 41 tt.EnableDelayedMutations(c) |
| 34 ds.Get(c).Testable().Consistent(true) | 42 ds.Get(c).Testable().Consistent(true) |
| 35 | 43 |
| 36 » » svcStub := ct.Services{} | 44 » » var tap ct.ArchivalPublisher |
| 45 » » svcStub := ct.Services{ |
| 46 » » » AP: func() (coordinator.ArchivalPublisher, error) { |
| 47 » » » » return &tap, nil |
| 48 » » » }, |
| 49 » » } |
| 37 svcStub.InitConfig() | 50 svcStub.InitConfig() |
| 38 svcStub.ServiceConfig.Coordinator.ServiceAuthGroup = "test-servi
ces" | 51 svcStub.ServiceConfig.Coordinator.ServiceAuthGroup = "test-servi
ces" |
| 52 svcStub.ServiceConfig.Coordinator.ArchiveDelayMax = google.NewDu
ration(time.Hour) |
| 39 c = coordinator.WithServices(c, &svcStub) | 53 c = coordinator.WithServices(c, &svcStub) |
| 40 | 54 |
| 41 svr := New() | 55 svr := New() |
| 42 | 56 |
| 43 fs := authtest.FakeState{} | 57 fs := authtest.FakeState{} |
| 44 c = auth.WithState(c, &fs) | 58 c = auth.WithState(c, &fs) |
| 45 | 59 |
| 46 Convey(`Returns Forbidden error if not a service.`, func() { | 60 Convey(`Returns Forbidden error if not a service.`, func() { |
| 47 _, err := svr.RegisterStream(c, &logdog.RegisterStreamRe
quest{}) | 61 _, err := svr.RegisterStream(c, &logdog.RegisterStreamRe
quest{}) |
| 48 So(err, ShouldBeRPCPermissionDenied) | 62 So(err, ShouldBeRPCPermissionDenied) |
| (...skipping 16 matching lines...) Expand all Loading... |
| 65 expResp := &logdog.RegisterStreamResponse{ | 79 expResp := &logdog.RegisterStreamResponse{ |
| 66 State: &logdog.LogStreamState{ | 80 State: &logdog.LogStreamState{ |
| 67 Path: "testing/+/foo/ba
r", | 81 Path: "testing/+/foo/ba
r", |
| 68 ProtoVersion: logpb.Version, | 82 ProtoVersion: logpb.Version, |
| 69 TerminalIndex: -1, | 83 TerminalIndex: -1, |
| 70 }, | 84 }, |
| 71 Secret: secret, | 85 Secret: secret, |
| 72 } | 86 } |
| 73 | 87 |
| 74 Convey(`Can register the stream.`, func() { | 88 Convey(`Can register the stream.`, func() { |
| 89 created := ds.RoundTime(tc.Now()) |
| 90 |
| 75 resp, err := svr.RegisterStream(c, &req) | 91 resp, err := svr.RegisterStream(c, &req) |
| 76 So(err, ShouldBeRPCOK) | 92 So(err, ShouldBeRPCOK) |
| 77 So(resp, ShouldResemble, expResp) | 93 So(resp, ShouldResemble, expResp) |
| 78 ds.Get(c).Testable().CatchupIndexes() | 94 ds.Get(c).Testable().CatchupIndexes() |
| 79 tt.Drain(c) | 95 tt.Drain(c) |
| 80 | 96 |
| 97 ls := coordinator.LogStreamFromPath(type
s.StreamPath(req.Path)) |
| 98 So(ds.Get(c).Get(ls), ShouldBeNil) |
| 99 So(ls.Created, ShouldResemble, created) |
| 100 |
| 101 // No archival request yet. |
| 102 So(tap.StreamNames(), ShouldResemble, []
string{}) |
| 103 |
| 81 // Should have name components. | 104 // Should have name components. |
| 82 getNameComponents := func(b string) []st
ring { | 105 getNameComponents := func(b string) []st
ring { |
| 83 l, err := hierarchy.Get(ds.Get(c
), hierarchy.Request{Base: b}) | 106 l, err := hierarchy.Get(ds.Get(c
), hierarchy.Request{Base: b}) |
| 84 if err != nil { | 107 if err != nil { |
| 85 panic(err) | 108 panic(err) |
| 86 } | 109 } |
| 87 names := make([]string, len(l.Co
mp)) | 110 names := make([]string, len(l.Co
mp)) |
| 88 for i, e := range l.Comp { | 111 for i, e := range l.Comp { |
| 89 names[i] = e.Name | 112 names[i] = e.Name |
| 90 if e.Stream != "" { | 113 if e.Stream != "" { |
| 91 names[i] += "$" | 114 names[i] += "$" |
| 92 } | 115 } |
| 93 } | 116 } |
| 94 return names | 117 return names |
| 95 } | 118 } |
| 96 So(getNameComponents(""), ShouldResemble
, []string{"testing"}) | 119 So(getNameComponents(""), ShouldResemble
, []string{"testing"}) |
| 97 So(getNameComponents("testing"), ShouldR
esemble, []string{"+"}) | 120 So(getNameComponents("testing"), ShouldR
esemble, []string{"+"}) |
| 98 So(getNameComponents("testing/+"), Shoul
dResemble, []string{"foo"}) | 121 So(getNameComponents("testing/+"), Shoul
dResemble, []string{"foo"}) |
| 99 So(getNameComponents("testing/+/foo"), S
houldResemble, []string{"bar$"}) | 122 So(getNameComponents("testing/+/foo"), S
houldResemble, []string{"bar$"}) |
| 100 | 123 |
| 101 Convey(`Can register the stream again (i
dempotent).`, func() { | 124 Convey(`Can register the stream again (i
dempotent).`, func() { |
| 125 tc.Set(created.Add(10 * time.Min
ute)) |
| 126 |
| 102 resp, err := svr.RegisterStream(
c, &req) | 127 resp, err := svr.RegisterStream(
c, &req) |
| 103 So(err, ShouldBeRPCOK) | 128 So(err, ShouldBeRPCOK) |
| 104 So(resp, ShouldResemble, expResp
) | 129 So(resp, ShouldResemble, expResp
) |
| 130 |
| 131 ls := coordinator.LogStreamFromP
ath(types.StreamPath(req.Path)) |
| 132 So(ds.Get(c).Get(ls), ShouldBeNi
l) |
| 133 So(ls.Created, ShouldResemble, c
reated) |
| 134 |
| 135 // No archival request yet. |
| 136 So(tap.StreamNames(), ShouldRese
mble, []string{}) |
| 137 |
| 138 Convey(`Forces an archival reque
st after first archive expiration.`, func() { |
| 139 tc.Set(created.Add(time.
Hour)) // 1 hour after initial registration. |
| 140 tt.Drain(c) |
| 141 |
| 142 So(tap.StreamNames(), Sh
ouldResemble, []string{ls.Name}) |
| 143 }) |
| 105 }) | 144 }) |
| 106 | 145 |
| 107 Convey(`Will not re-register if scerets
don't match.`, func() { | 146 Convey(`Will not re-register if scerets
don't match.`, func() { |
| 108 req.Secret[0] = 0xAB | 147 req.Secret[0] = 0xAB |
| 109 _, err := svr.RegisterStream(c,
&req) | 148 _, err := svr.RegisterStream(c,
&req) |
| 110 So(err, ShouldBeRPCAlreadyExists
, "Log stream is already incompatibly registered") | 149 So(err, ShouldBeRPCAlreadyExists
, "Log stream is already incompatibly registered") |
| 111 }) | 150 }) |
| 112 | 151 |
| 113 Convey(`Will not re-register if descript
or data differs.`, func() { | 152 Convey(`Will not re-register if descript
or data differs.`, func() { |
| 114 req.Desc.Tags = map[string]strin
g{ | 153 req.Desc.Tags = map[string]strin
g{ |
| (...skipping 68 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 183 So(req.Desc.Validate(true), Shou
ldNotBeNil) | 222 So(req.Desc.Validate(true), Shou
ldNotBeNil) |
| 184 | 223 |
| 185 _, err := svr.RegisterStream(c,
&req) | 224 _, err := svr.RegisterStream(c,
&req) |
| 186 So(err, ShouldBeRPCInvalidArgume
nt, "Invalid log stream descriptor") | 225 So(err, ShouldBeRPCInvalidArgume
nt, "Invalid log stream descriptor") |
| 187 }) | 226 }) |
| 188 }) | 227 }) |
| 189 }) | 228 }) |
| 190 }) | 229 }) |
| 191 }) | 230 }) |
| 192 } | 231 } |
| OLD | NEW |