Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(16)

Side by Side Diff: appengine/logdog/coordinator/endpoints/services/terminateStream_test.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Minor fixes, works in dev now. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package services 5 package services
6 6
7 import ( 7 import (
8 "errors" 8 "errors"
9 "testing" 9 "testing"
10 "time" 10 "time"
11 11
12 "github.com/luci/gae/filter/featureBreaker" 12 "github.com/luci/gae/filter/featureBreaker"
13 "github.com/luci/gae/impl/memory" 13 "github.com/luci/gae/impl/memory"
14 ds "github.com/luci/gae/service/datastore" 14 ds "github.com/luci/gae/service/datastore"
15 tq "github.com/luci/gae/service/taskqueue"
15 "github.com/luci/luci-go/appengine/logdog/coordinator" 16 "github.com/luci/luci-go/appengine/logdog/coordinator"
16 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest " 17 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest "
17 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1" 18 "github.com/luci/luci-go/common/api/logdog_coordinator/services/v1"
18 "github.com/luci/luci-go/common/clock/testclock" 19 "github.com/luci/luci-go/common/clock/testclock"
20 "github.com/luci/luci-go/common/proto/google"
19 "github.com/luci/luci-go/common/proto/logdog/svcconfig" 21 "github.com/luci/luci-go/common/proto/logdog/svcconfig"
20 "github.com/luci/luci-go/server/auth" 22 "github.com/luci/luci-go/server/auth"
21 "github.com/luci/luci-go/server/auth/authtest" 23 "github.com/luci/luci-go/server/auth/authtest"
22 "golang.org/x/net/context" 24 "golang.org/x/net/context"
23 25
24 . "github.com/luci/luci-go/common/testing/assertions" 26 . "github.com/luci/luci-go/common/testing/assertions"
25 . "github.com/smartystreets/goconvey/convey" 27 . "github.com/smartystreets/goconvey/convey"
26 ) 28 )
27 29
28 func TestTerminateStream(t *testing.T) { 30 func TestTerminateStream(t *testing.T) {
29 t.Parallel() 31 t.Parallel()
30 32
31 Convey(`With a testing configuration`, t, func() { 33 Convey(`With a testing configuration`, t, func() {
32 c, tc := testclock.UseTime(context.Background(), testclock.TestT imeLocal) 34 c, tc := testclock.UseTime(context.Background(), testclock.TestT imeLocal)
33 c = memory.Use(c) 35 c = memory.Use(c)
34 be := Server{} 36 be := Server{}
35 37
36 desc := ct.TestLogStreamDescriptor(c, "foo/bar") 38 desc := ct.TestLogStreamDescriptor(c, "foo/bar")
37 ls := ct.TestLogStream(c, desc) 39 ls := ct.TestLogStream(c, desc)
38 40
39 req := logdog.TerminateStreamRequest{ 41 req := logdog.TerminateStreamRequest{
40 Path: "testing/+/foo/bar", 42 Path: "testing/+/foo/bar",
41 Secret: ls.Secret, 43 Secret: ls.Secret,
42 TerminalIndex: 1337, 44 TerminalIndex: 1337,
43 } 45 }
44 46
45 » » c = ct.UseConfig(c, &svcconfig.Coordinator{ 47 » » ccfg := svcconfig.Coordinator{
46 » » » ServiceAuthGroup: "test-services", 48 » » » ServiceAuthGroup: "test-services",
47 » » }) 49 » » » ArchiveTaskQueue: "archive-task-queue",
50 » » » ArchiveSettleDelay: google.NewDuration(10 * time.Second) ,
51 » » » ArchiveDelayMax: google.NewDuration(24 * time.Hour),
52 » » }
53 » » c = ct.UseConfig(c, &ccfg)
54
55 » » // Create an archival task queue.
56 » » tq.Get(c).Testable().CreateQueue(ccfg.ArchiveTaskQueue)
57 » » archiveTasks := func() []string {
58 » » » tasks, err := ct.GetArchiveTaskStreams(tq.Get(c), ccfg.A rchiveTaskQueue)
59 » » » if err != nil {
60 » » » » panic(err)
61 » » » }
62 » » » return tasks
63 » » }
64
48 fs := authtest.FakeState{} 65 fs := authtest.FakeState{}
49 c = auth.WithState(c, &fs) 66 c = auth.WithState(c, &fs)
50 67
51 Convey(`Returns Forbidden error if not a service.`, func() { 68 Convey(`Returns Forbidden error if not a service.`, func() {
52 _, err := be.TerminateStream(c, &req) 69 _, err := be.TerminateStream(c, &req)
53 So(err, ShouldBeRPCPermissionDenied) 70 So(err, ShouldBeRPCPermissionDenied)
54 }) 71 })
55 72
56 Convey(`When logged in as a service`, func() { 73 Convey(`When logged in as a service`, func() {
57 fs.IdentityGroups = []string{"test-services"} 74 fs.IdentityGroups = []string{"test-services"}
58 75
59 Convey(`A non-terminal registered stream, "testing/+/foo /bar"`, func() { 76 Convey(`A non-terminal registered stream, "testing/+/foo /bar"`, func() {
60 » » » » So(ls.Put(ds.Get(c)), ShouldBeNil) 77 » » » » So(ds.Get(c).Put(ls), ShouldBeNil)
61 » » » » tc.Add(time.Second)
62 78
63 » » » » Convey(`Can be marked terminal.`, func() { 79 » » » » Convey(`Can be marked terminal and schedules an archival task.`, func() {
64 _, err := be.TerminateStream(c, &req) 80 _, err := be.TerminateStream(c, &req)
65 So(err, ShouldBeRPCOK) 81 So(err, ShouldBeRPCOK)
66 82
67 // Reload "ls" and confirm. 83 // Reload "ls" and confirm.
68 So(ds.Get(c).Get(ls), ShouldBeNil) 84 So(ds.Get(c).Get(ls), ShouldBeNil)
69 So(ls.TerminalIndex, ShouldEqual, 1337) 85 So(ls.TerminalIndex, ShouldEqual, 1337)
70 » » » » » So(ls.State, ShouldEqual, coordinator.LS Terminated) 86 » » » » » So(ls.State, ShouldEqual, coordinator.LS ArchiveTasked)
71 » » » » » So(ls.Updated, ShouldResemble, ls.Create d.Add(time.Second)) 87 » » » » » So(ls.Terminated(), ShouldBeTrue)
88 » » » » » So(archiveTasks(), ShouldResemble, []str ing{ls.Name})
89
90 » » » » » // Assert that all archive tasks are sch eduled ArchiveSettleDelay in
91 » » » » » // the future.
92 » » » » » for _, t := range tq.Get(c).Testable().G etScheduledTasks()[ccfg.ArchiveTaskQueue] {
93 » » » » » » So(t.ETA.After(tc.Now()), Should BeTrue)
94 » » » » » }
72 95
73 Convey(`Can be marked terminal again (id empotent).`, func() { 96 Convey(`Can be marked terminal again (id empotent).`, func() {
74 _, err := be.TerminateStream(c, &req) 97 _, err := be.TerminateStream(c, &req)
75 So(err, ShouldBeRPCOK) 98 So(err, ShouldBeRPCOK)
76 99
77 // Reload "ls" and confirm. 100 // Reload "ls" and confirm.
78 So(ds.Get(c).Get(ls), ShouldBeNi l) 101 So(ds.Get(c).Get(ls), ShouldBeNi l)
102
103 So(ls.Terminated(), ShouldBeTrue )
79 So(ls.TerminalIndex, ShouldEqual , 1337) 104 So(ls.TerminalIndex, ShouldEqual , 1337)
80 » » » » » » So(ls.State, ShouldEqual, coordi nator.LSTerminated) 105 » » » » » » So(ls.State, ShouldEqual, coordi nator.LSArchiveTasked)
106 » » » » » » So(archiveTasks(), ShouldResembl e, []string{ls.Name})
81 }) 107 })
82 108
83 Convey(`Will reject attempts to change t he terminal index.`, func() { 109 Convey(`Will reject attempts to change t he terminal index.`, func() {
84 req.TerminalIndex = 1338 110 req.TerminalIndex = 1338
85 _, err := be.TerminateStream(c, &req) 111 _, err := be.TerminateStream(c, &req)
86 » » » » » » So(err, ShouldBeRPCAlreadyExists , "Terminal index is already set") 112 » » » » » » So(err, ShouldBeRPCFailedPrecond ition, "Log stream is not in streaming state.")
87 113
88 // Reload "ls" and confirm. 114 // Reload "ls" and confirm.
89 So(ds.Get(c).Get(ls), ShouldBeNi l) 115 So(ds.Get(c).Get(ls), ShouldBeNi l)
90 » » » » » » So(ls.State, ShouldEqual, coordi nator.LSTerminated) 116
117 » » » » » » So(ls.Terminated(), ShouldBeTrue )
118 » » » » » » So(ls.State, ShouldEqual, coordi nator.LSArchiveTasked)
91 So(ls.TerminalIndex, ShouldEqual , 1337) 119 So(ls.TerminalIndex, ShouldEqual , 1337)
120 So(archiveTasks(), ShouldResembl e, []string{ls.Name})
92 }) 121 })
93 122
94 Convey(`Will reject attempts to clear th e terminal index.`, func() { 123 Convey(`Will reject attempts to clear th e terminal index.`, func() {
95 req.TerminalIndex = -1 124 req.TerminalIndex = -1
96 _, err := be.TerminateStream(c, &req) 125 _, err := be.TerminateStream(c, &req)
97 So(err, ShouldBeRPCInvalidArgume nt, "Negative terminal index.") 126 So(err, ShouldBeRPCInvalidArgume nt, "Negative terminal index.")
98 127
99 // Reload "ls" and confirm. 128 // Reload "ls" and confirm.
100 So(ds.Get(c).Get(ls), ShouldBeNi l) 129 So(ds.Get(c).Get(ls), ShouldBeNi l)
101 » » » » » » So(ls.State, ShouldEqual, coordi nator.LSTerminated) 130
131 » » » » » » So(ls.Terminated(), ShouldBeTrue )
132 » » » » » » So(ls.State, ShouldEqual, coordi nator.LSArchiveTasked)
102 So(ls.TerminalIndex, ShouldEqual , 1337) 133 So(ls.TerminalIndex, ShouldEqual , 1337)
134 So(archiveTasks(), ShouldResembl e, []string{ls.Name})
103 }) 135 })
104 }) 136 })
105 137
106 Convey(`Will return an internal server error if Put() fails.`, func() { 138 Convey(`Will return an internal server error if Put() fails.`, func() {
107 c, fb := featureBreaker.FilterRDS(c, nil ) 139 c, fb := featureBreaker.FilterRDS(c, nil )
108 fb.BreakFeatures(errors.New("test error" ), "PutMulti") 140 fb.BreakFeatures(errors.New("test error" ), "PutMulti")
109 _, err := be.TerminateStream(c, &req) 141 _, err := be.TerminateStream(c, &req)
110 So(err, ShouldBeRPCInternal) 142 So(err, ShouldBeRPCInternal)
111 }) 143 })
112 144
(...skipping 17 matching lines...) Expand all
130 So(err, ShouldBeRPCInvalidArgument, "Invalid pat h") 162 So(err, ShouldBeRPCInvalidArgument, "Invalid pat h")
131 }) 163 })
132 164
133 Convey(`Will fail if the stream is not registered.`, fun c() { 165 Convey(`Will fail if the stream is not registered.`, fun c() {
134 _, err := be.TerminateStream(c, &req) 166 _, err := be.TerminateStream(c, &req)
135 So(err, ShouldBeRPCNotFound, "is not registered" ) 167 So(err, ShouldBeRPCNotFound, "is not registered" )
136 }) 168 })
137 }) 169 })
138 }) 170 })
139 } 171 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698