| Index: appengine/logdog/coordinator/endpoints/services/registerStream_test.go
|
| diff --git a/appengine/logdog/coordinator/endpoints/services/registerStream_test.go b/appengine/logdog/coordinator/endpoints/services/registerStream_test.go
|
| index 72d5f3334fe7857f3bf45b343cf363691c1c9423..1ac82c7fa71deec7f7ccc30b9ed3cef478e99890 100644
|
| --- a/appengine/logdog/coordinator/endpoints/services/registerStream_test.go
|
| +++ b/appengine/logdog/coordinator/endpoints/services/registerStream_test.go
|
| @@ -8,6 +8,7 @@ import (
|
| "bytes"
|
| "errors"
|
| "testing"
|
| + "time"
|
|
|
| "github.com/luci/gae/filter/featureBreaker"
|
| ds "github.com/luci/gae/service/datastore"
|
| @@ -16,7 +17,10 @@ import (
|
| "github.com/luci/luci-go/appengine/logdog/coordinator/hierarchy"
|
| "github.com/luci/luci-go/appengine/tumble"
|
| "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
|
| + "github.com/luci/luci-go/common/clock"
|
| + "github.com/luci/luci-go/common/clock/testclock"
|
| "github.com/luci/luci-go/common/logdog/types"
|
| + "github.com/luci/luci-go/common/proto/google"
|
| "github.com/luci/luci-go/common/proto/logdog/logpb"
|
| "github.com/luci/luci-go/server/auth"
|
| "github.com/luci/luci-go/server/auth/authtest"
|
| @@ -31,11 +35,21 @@ func TestRegisterStream(t *testing.T) {
|
| Convey(`With a testing configuration`, t, func() {
|
| tt := &tumble.Testing{}
|
| c := tt.Context()
|
| + tc := clock.Get(c).(testclock.TestClock)
|
| +
|
| + // Add Tumble delayed mutation index.
|
| + tt.EnableDelayedMutations(c)
|
| ds.Get(c).Testable().Consistent(true)
|
|
|
| - svcStub := ct.Services{}
|
| + var tap ct.ArchivalPublisher
|
| + svcStub := ct.Services{
|
| + AP: func() (coordinator.ArchivalPublisher, error) {
|
| + return &tap, nil
|
| + },
|
| + }
|
| svcStub.InitConfig()
|
| svcStub.ServiceConfig.Coordinator.ServiceAuthGroup = "test-services"
|
| + svcStub.ServiceConfig.Coordinator.ArchiveDelayMax = google.NewDuration(time.Hour)
|
| c = coordinator.WithServices(c, &svcStub)
|
|
|
| svr := New()
|
| @@ -72,12 +86,21 @@ func TestRegisterStream(t *testing.T) {
|
| }
|
|
|
| Convey(`Can register the stream.`, func() {
|
| + created := ds.RoundTime(tc.Now())
|
| +
|
| resp, err := svr.RegisterStream(c, &req)
|
| So(err, ShouldBeRPCOK)
|
| So(resp, ShouldResemble, expResp)
|
| ds.Get(c).Testable().CatchupIndexes()
|
| tt.Drain(c)
|
|
|
| + ls := coordinator.LogStreamFromPath(types.StreamPath(req.Path))
|
| + So(ds.Get(c).Get(ls), ShouldBeNil)
|
| + So(ls.Created, ShouldResemble, created)
|
| +
|
| + // No archival request yet.
|
| + So(tap.StreamNames(), ShouldResemble, []string{})
|
| +
|
| // Should have name components.
|
| getNameComponents := func(b string) []string {
|
| l, err := hierarchy.Get(ds.Get(c), hierarchy.Request{Base: b})
|
| @@ -99,9 +122,25 @@ func TestRegisterStream(t *testing.T) {
|
| So(getNameComponents("testing/+/foo"), ShouldResemble, []string{"bar$"})
|
|
|
| Convey(`Can register the stream again (idempotent).`, func() {
|
| + tc.Set(created.Add(10 * time.Minute))
|
| +
|
| resp, err := svr.RegisterStream(c, &req)
|
| So(err, ShouldBeRPCOK)
|
| So(resp, ShouldResemble, expResp)
|
| +
|
| + ls := coordinator.LogStreamFromPath(types.StreamPath(req.Path))
|
| + So(ds.Get(c).Get(ls), ShouldBeNil)
|
| + So(ls.Created, ShouldResemble, created)
|
| +
|
| + // No archival request yet.
|
| + So(tap.StreamNames(), ShouldResemble, []string{})
|
| +
|
| + Convey(`Forces an archival request after first archive expiration.`, func() {
|
| + tc.Set(created.Add(time.Hour)) // 1 hour after initial registration.
|
| + tt.Drain(c)
|
| +
|
| + So(tap.StreamNames(), ShouldResemble, []string{ls.Name})
|
| + })
|
| })
|
|
|
| Convey(`Will not re-register if scerets don't match.`, func() {
|
|
|