| OLD | NEW |
| 1 // Copyright 2015 The Chromium Authors. All rights reserved. | 1 // Copyright 2015 The Chromium Authors. All rights reserved. |
| 2 // Use of this source code is governed by a BSD-style license that can be | 2 // Use of this source code is governed by a BSD-style license that can be |
| 3 // found in the LICENSE file. | 3 // found in the LICENSE file. |
| 4 | 4 |
| 5 package services | 5 package services |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "errors" | 8 "errors" |
| 9 "testing" | 9 "testing" |
| 10 "time" | 10 "time" |
| 11 | 11 |
| 12 "github.com/luci/gae/filter/featureBreaker" | 12 "github.com/luci/gae/filter/featureBreaker" |
| 13 "github.com/luci/gae/impl/memory" | 13 "github.com/luci/gae/impl/memory" |
| 14 ds "github.com/luci/gae/service/datastore" | 14 ds "github.com/luci/gae/service/datastore" |
| 15 tq "github.com/luci/gae/service/taskqueue" |
| 15 "github.com/luci/luci-go/appengine/logdog/coordinator" | 16 "github.com/luci/luci-go/appengine/logdog/coordinator" |
| 16 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" | 17 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest
" |
| 17 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" | 18 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" |
| 18 "github.com/luci/luci-go/common/clock/testclock" | 19 "github.com/luci/luci-go/common/clock/testclock" |
| 20 "github.com/luci/luci-go/common/proto/google" |
| 19 "github.com/luci/luci-go/common/proto/logdog/svcconfig" | 21 "github.com/luci/luci-go/common/proto/logdog/svcconfig" |
| 20 "github.com/luci/luci-go/server/auth" | 22 "github.com/luci/luci-go/server/auth" |
| 21 "github.com/luci/luci-go/server/auth/authtest" | 23 "github.com/luci/luci-go/server/auth/authtest" |
| 22 "golang.org/x/net/context" | 24 "golang.org/x/net/context" |
| 23 | 25 |
| 24 . "github.com/luci/luci-go/common/testing/assertions" | 26 . "github.com/luci/luci-go/common/testing/assertions" |
| 25 . "github.com/smartystreets/goconvey/convey" | 27 . "github.com/smartystreets/goconvey/convey" |
| 26 ) | 28 ) |
| 27 | 29 |
| 28 func TestTerminateStream(t *testing.T) { | 30 func TestTerminateStream(t *testing.T) { |
| 29 t.Parallel() | 31 t.Parallel() |
| 30 | 32 |
| 31 Convey(`With a testing configuration`, t, func() { | 33 Convey(`With a testing configuration`, t, func() { |
| 32 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeLocal) | 34 c, tc := testclock.UseTime(context.Background(), testclock.TestT
imeLocal) |
| 33 c = memory.Use(c) | 35 c = memory.Use(c) |
| 34 be := Server{} | 36 be := Server{} |
| 35 | 37 |
| 36 desc := ct.TestLogStreamDescriptor(c, "foo/bar") | 38 desc := ct.TestLogStreamDescriptor(c, "foo/bar") |
| 37 ls := ct.TestLogStream(c, desc) | 39 ls := ct.TestLogStream(c, desc) |
| 38 | 40 |
| 39 req := logdog.TerminateStreamRequest{ | 41 req := logdog.TerminateStreamRequest{ |
| 40 Path: "testing/+/foo/bar", | 42 Path: "testing/+/foo/bar", |
| 41 Secret: ls.Secret, | 43 Secret: ls.Secret, |
| 42 TerminalIndex: 1337, | 44 TerminalIndex: 1337, |
| 43 } | 45 } |
| 44 | 46 |
| 45 » » c = ct.UseConfig(c, &svcconfig.Coordinator{ | 47 » » ccfg := svcconfig.Coordinator{ |
| 46 » » » ServiceAuthGroup: "test-services", | 48 » » » ServiceAuthGroup: "test-services", |
| 47 » » }) | 49 » » » ArchiveTaskQueue: "archive-task-queue", |
| 50 » » » ArchiveSettleDelay: google.NewDuration(10 * time.Second)
, |
| 51 » » » ArchiveDelayMax: google.NewDuration(24 * time.Hour), |
| 52 » » } |
| 53 » » c = ct.UseConfig(c, &ccfg) |
| 54 |
| 55 » » // Create an archival task queue. |
| 56 » » tq.Get(c).Testable().CreateQueue(ccfg.ArchiveTaskQueue) |
| 57 » » archiveTasks := func() []string { |
| 58 » » » tasks, err := ct.GetArchiveTaskStreams(tq.Get(c), ccfg.A
rchiveTaskQueue) |
| 59 » » » if err != nil { |
| 60 » » » » panic(err) |
| 61 » » » } |
| 62 » » » return tasks |
| 63 » » } |
| 64 |
| 48 fs := authtest.FakeState{} | 65 fs := authtest.FakeState{} |
| 49 c = auth.WithState(c, &fs) | 66 c = auth.WithState(c, &fs) |
| 50 | 67 |
| 51 Convey(`Returns Forbidden error if not a service.`, func() { | 68 Convey(`Returns Forbidden error if not a service.`, func() { |
| 52 _, err := be.TerminateStream(c, &req) | 69 _, err := be.TerminateStream(c, &req) |
| 53 So(err, ShouldBeRPCPermissionDenied) | 70 So(err, ShouldBeRPCPermissionDenied) |
| 54 }) | 71 }) |
| 55 | 72 |
| 56 Convey(`When logged in as a service`, func() { | 73 Convey(`When logged in as a service`, func() { |
| 57 fs.IdentityGroups = []string{"test-services"} | 74 fs.IdentityGroups = []string{"test-services"} |
| 58 | 75 |
| 59 Convey(`A non-terminal registered stream, "testing/+/foo
/bar"`, func() { | 76 Convey(`A non-terminal registered stream, "testing/+/foo
/bar"`, func() { |
| 60 » » » » So(ls.Put(ds.Get(c)), ShouldBeNil) | 77 » » » » So(ds.Get(c).Put(ls), ShouldBeNil) |
| 61 » » » » tc.Add(time.Second) | |
| 62 | 78 |
| 63 » » » » Convey(`Can be marked terminal.`, func() { | 79 » » » » Convey(`Can be marked terminal and schedules an
archival task.`, func() { |
| 64 _, err := be.TerminateStream(c, &req) | 80 _, err := be.TerminateStream(c, &req) |
| 65 So(err, ShouldBeRPCOK) | 81 So(err, ShouldBeRPCOK) |
| 66 | 82 |
| 67 // Reload "ls" and confirm. | 83 // Reload "ls" and confirm. |
| 68 So(ds.Get(c).Get(ls), ShouldBeNil) | 84 So(ds.Get(c).Get(ls), ShouldBeNil) |
| 69 So(ls.TerminalIndex, ShouldEqual, 1337) | 85 So(ls.TerminalIndex, ShouldEqual, 1337) |
| 70 » » » » » So(ls.State, ShouldEqual, coordinator.LS
Terminated) | 86 » » » » » So(ls.State, ShouldEqual, coordinator.LS
ArchiveTasked) |
| 71 » » » » » So(ls.Updated, ShouldResemble, ls.Create
d.Add(time.Second)) | 87 » » » » » So(ls.Terminated(), ShouldBeTrue) |
| 88 » » » » » So(archiveTasks(), ShouldResemble, []str
ing{ls.Name}) |
| 89 |
| 90 » » » » » // Assert that all archive tasks are sch
eduled ArchiveSettleDelay in |
| 91 » » » » » // the future. |
| 92 » » » » » for _, t := range tq.Get(c).Testable().G
etScheduledTasks()[ccfg.ArchiveTaskQueue] { |
| 93 » » » » » » So(t.ETA.After(tc.Now()), Should
BeTrue) |
| 94 » » » » » } |
| 72 | 95 |
| 73 Convey(`Can be marked terminal again (id
empotent).`, func() { | 96 Convey(`Can be marked terminal again (id
empotent).`, func() { |
| 74 _, err := be.TerminateStream(c,
&req) | 97 _, err := be.TerminateStream(c,
&req) |
| 75 So(err, ShouldBeRPCOK) | 98 So(err, ShouldBeRPCOK) |
| 76 | 99 |
| 77 // Reload "ls" and confirm. | 100 // Reload "ls" and confirm. |
| 78 So(ds.Get(c).Get(ls), ShouldBeNi
l) | 101 So(ds.Get(c).Get(ls), ShouldBeNi
l) |
| 102 |
| 103 So(ls.Terminated(), ShouldBeTrue
) |
| 79 So(ls.TerminalIndex, ShouldEqual
, 1337) | 104 So(ls.TerminalIndex, ShouldEqual
, 1337) |
| 80 » » » » » » So(ls.State, ShouldEqual, coordi
nator.LSTerminated) | 105 » » » » » » So(ls.State, ShouldEqual, coordi
nator.LSArchiveTasked) |
| 106 » » » » » » So(archiveTasks(), ShouldResembl
e, []string{ls.Name}) |
| 81 }) | 107 }) |
| 82 | 108 |
| 83 Convey(`Will reject attempts to change t
he terminal index.`, func() { | 109 Convey(`Will reject attempts to change t
he terminal index.`, func() { |
| 84 req.TerminalIndex = 1338 | 110 req.TerminalIndex = 1338 |
| 85 _, err := be.TerminateStream(c,
&req) | 111 _, err := be.TerminateStream(c,
&req) |
| 86 » » » » » » So(err, ShouldBeRPCAlreadyExists
, "Terminal index is already set") | 112 » » » » » » So(err, ShouldBeRPCFailedPrecond
ition, "Log stream is not in streaming state.") |
| 87 | 113 |
| 88 // Reload "ls" and confirm. | 114 // Reload "ls" and confirm. |
| 89 So(ds.Get(c).Get(ls), ShouldBeNi
l) | 115 So(ds.Get(c).Get(ls), ShouldBeNi
l) |
| 90 » » » » » » So(ls.State, ShouldEqual, coordi
nator.LSTerminated) | 116 |
| 117 » » » » » » So(ls.Terminated(), ShouldBeTrue
) |
| 118 » » » » » » So(ls.State, ShouldEqual, coordi
nator.LSArchiveTasked) |
| 91 So(ls.TerminalIndex, ShouldEqual
, 1337) | 119 So(ls.TerminalIndex, ShouldEqual
, 1337) |
| 120 So(archiveTasks(), ShouldResembl
e, []string{ls.Name}) |
| 92 }) | 121 }) |
| 93 | 122 |
| 94 Convey(`Will reject attempts to clear th
e terminal index.`, func() { | 123 Convey(`Will reject attempts to clear th
e terminal index.`, func() { |
| 95 req.TerminalIndex = -1 | 124 req.TerminalIndex = -1 |
| 96 _, err := be.TerminateStream(c,
&req) | 125 _, err := be.TerminateStream(c,
&req) |
| 97 So(err, ShouldBeRPCInvalidArgume
nt, "Negative terminal index.") | 126 So(err, ShouldBeRPCInvalidArgume
nt, "Negative terminal index.") |
| 98 | 127 |
| 99 // Reload "ls" and confirm. | 128 // Reload "ls" and confirm. |
| 100 So(ds.Get(c).Get(ls), ShouldBeNi
l) | 129 So(ds.Get(c).Get(ls), ShouldBeNi
l) |
| 101 » » » » » » So(ls.State, ShouldEqual, coordi
nator.LSTerminated) | 130 |
| 131 » » » » » » So(ls.Terminated(), ShouldBeTrue
) |
| 132 » » » » » » So(ls.State, ShouldEqual, coordi
nator.LSArchiveTasked) |
| 102 So(ls.TerminalIndex, ShouldEqual
, 1337) | 133 So(ls.TerminalIndex, ShouldEqual
, 1337) |
| 134 So(archiveTasks(), ShouldResembl
e, []string{ls.Name}) |
| 103 }) | 135 }) |
| 104 }) | 136 }) |
| 105 | 137 |
| 106 Convey(`Will return an internal server error if
Put() fails.`, func() { | 138 Convey(`Will return an internal server error if
Put() fails.`, func() { |
| 107 c, fb := featureBreaker.FilterRDS(c, nil
) | 139 c, fb := featureBreaker.FilterRDS(c, nil
) |
| 108 fb.BreakFeatures(errors.New("test error"
), "PutMulti") | 140 fb.BreakFeatures(errors.New("test error"
), "PutMulti") |
| 109 _, err := be.TerminateStream(c, &req) | 141 _, err := be.TerminateStream(c, &req) |
| 110 So(err, ShouldBeRPCInternal) | 142 So(err, ShouldBeRPCInternal) |
| 111 }) | 143 }) |
| 112 | 144 |
| (...skipping 17 matching lines...) Expand all Loading... |
| 130 So(err, ShouldBeRPCInvalidArgument, "Invalid pat
h") | 162 So(err, ShouldBeRPCInvalidArgument, "Invalid pat
h") |
| 131 }) | 163 }) |
| 132 | 164 |
| 133 Convey(`Will fail if the stream is not registered.`, fun
c() { | 165 Convey(`Will fail if the stream is not registered.`, fun
c() { |
| 134 _, err := be.TerminateStream(c, &req) | 166 _, err := be.TerminateStream(c, &req) |
| 135 So(err, ShouldBeRPCNotFound, "is not registered"
) | 167 So(err, ShouldBeRPCNotFound, "is not registered"
) |
| 136 }) | 168 }) |
| 137 }) | 169 }) |
| 138 }) | 170 }) |
| 139 } | 171 } |
| OLD | NEW |