Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(413)

Side by Side Diff: appengine/logdog/coordinator/endpoints/logs/get_test.go

Issue 1863973002: LogDog: Update to archival V2. (Closed) Base URL: https://github.com/luci/luci-go@grpcutil-errors
Patch Set: Fix proto comment. Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The Chromium Authors. All rights reserved. 1 // Copyright 2015 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be 2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. 3 // found in the LICENSE file.
4 4
5 package logs 5 package logs
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "errors" 9 "errors"
10 "fmt" 10 "fmt"
11 "io" 11 "io"
12 "testing" 12 "testing"
13 "time" 13 "time"
14 14
15 "github.com/golang/protobuf/proto" 15 "github.com/golang/protobuf/proto"
16 "github.com/luci/gae/filter/featureBreaker" 16 "github.com/luci/gae/filter/featureBreaker"
17 "github.com/luci/gae/impl/memory" 17 "github.com/luci/gae/impl/memory"
18 ds "github.com/luci/gae/service/datastore" 18 ds "github.com/luci/gae/service/datastore"
19 "github.com/luci/luci-go/appengine/logdog/coordinator" 19 "github.com/luci/luci-go/appengine/logdog/coordinator"
20 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest " 20 ct "github.com/luci/luci-go/appengine/logdog/coordinator/coordinatorTest "
21 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1" 21 "github.com/luci/luci-go/common/api/logdog_coordinator/logs/v1"
22 "github.com/luci/luci-go/common/clock/testclock" 22 "github.com/luci/luci-go/common/clock/testclock"
23 "github.com/luci/luci-go/common/gcloud/gs" 23 "github.com/luci/luci-go/common/gcloud/gs"
24 "github.com/luci/luci-go/common/iotools" 24 "github.com/luci/luci-go/common/iotools"
25 "github.com/luci/luci-go/common/logdog/types" 25 "github.com/luci/luci-go/common/logdog/types"
26 "github.com/luci/luci-go/common/proto/logdog/logpb" 26 "github.com/luci/luci-go/common/proto/logdog/logpb"
27 "github.com/luci/luci-go/common/proto/logdog/svcconfig"
28 "github.com/luci/luci-go/common/recordio" 27 "github.com/luci/luci-go/common/recordio"
29 "github.com/luci/luci-go/server/auth" 28 "github.com/luci/luci-go/server/auth"
30 "github.com/luci/luci-go/server/auth/authtest" 29 "github.com/luci/luci-go/server/auth/authtest"
31 "github.com/luci/luci-go/server/logdog/archive" 30 "github.com/luci/luci-go/server/logdog/archive"
32 "github.com/luci/luci-go/server/logdog/storage" 31 "github.com/luci/luci-go/server/logdog/storage"
33 memoryStorage "github.com/luci/luci-go/server/logdog/storage/memory" 32 memoryStorage "github.com/luci/luci-go/server/logdog/storage/memory"
34 "golang.org/x/net/context" 33 "golang.org/x/net/context"
35 34
36 . "github.com/luci/luci-go/common/testing/assertions" 35 . "github.com/luci/luci-go/common/testing/assertions"
37 . "github.com/smartystreets/goconvey/convey" 36 . "github.com/smartystreets/goconvey/convey"
38 ) 37 )
39 38
40 type staticArchiveSource []*logpb.LogEntry 39 type staticArchiveSource []*logpb.LogEntry
41 40
42 func (s *staticArchiveSource) NextLogEntry() (le *logpb.LogEntry, err error) { 41 func (s *staticArchiveSource) NextLogEntry() (le *logpb.LogEntry, err error) {
43 if len(*s) == 0 { 42 if len(*s) == 0 {
44 err = archive.ErrEndOfStream 43 err = archive.ErrEndOfStream
45 } else { 44 } else {
46 le, *s = (*s)[0], (*s)[1:] 45 le, *s = (*s)[0], (*s)[1:]
47 } 46 }
48 return 47 return
49 } 48 }
50 49
51 type testGSClient map[string][]byte 50 type testGSClient map[gs.Path][]byte
52 51
53 func (c testGSClient) key(bucket, relpath string) string { 52 func (c testGSClient) put(path gs.Path, d []byte) {
54 » return fmt.Sprintf("%s/%s", bucket, relpath) 53 » c[path] = d
55 } 54 }
56 55
57 func (c testGSClient) put(bucket, relpath string, d []byte) { 56 func (c testGSClient) get(path gs.Path) []byte {
58 » c[c.key(bucket, relpath)] = d 57 » return c[path]
59 }
60
61 func (c testGSClient) get(bucket, relpath string) []byte {
62 » return c[c.key(bucket, relpath)]
63 } 58 }
64 59
65 func (c testGSClient) Close() error { return nil } 60 func (c testGSClient) Close() error { return nil }
66 func (c testGSClient) NewWriter(string, string) (gs.Writer, error) { 61 func (c testGSClient) NewWriter(gs.Path) (gs.Writer, error) {
67 return nil, errors.New("not implemented") 62 return nil, errors.New("not implemented")
68 } 63 }
69 func (c testGSClient) Delete(string, string) error { return errors.New("not impl emented") } 64 func (c testGSClient) Rename(gs.Path, gs.Path) error { return errors.New("not im plemented") }
65 func (c testGSClient) Delete(gs.Path) error { return errors.New("not im plemented") }
70 66
71 func (c testGSClient) NewReader(bucket, relpath string, o gs.Options) (io.ReadCl oser, error) { 67 func (c testGSClient) NewReader(path gs.Path, o gs.Options) (io.ReadCloser, erro r) {
72 if d, ok := c["error"]; ok { 68 if d, ok := c["error"]; ok {
73 return nil, errors.New(string(d)) 69 return nil, errors.New(string(d))
74 } 70 }
75 71
76 » d, ok := c[c.key(bucket, relpath)] 72 » d, ok := c[path]
77 if !ok { 73 if !ok {
78 return nil, errors.New("does not exist") 74 return nil, errors.New("does not exist")
79 } 75 }
80 76
81 to := int(o.To) 77 to := int(o.To)
82 if to == 0 { 78 if to == 0 {
83 to = len(d) 79 to = len(d)
84 } 80 }
85 d = d[int(o.From):to] 81 d = d[int(o.From):to]
86 82
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
148 func TestGet(t *testing.T) { 144 func TestGet(t *testing.T) {
149 t.Parallel() 145 t.Parallel()
150 146
151 Convey(`With a testing configuration, a Get request`, t, func() { 147 Convey(`With a testing configuration, a Get request`, t, func() {
152 c, tc := testclock.UseTime(context.Background(), testclock.TestT imeLocal) 148 c, tc := testclock.UseTime(context.Background(), testclock.TestT imeLocal)
153 c = memory.Use(c) 149 c = memory.Use(c)
154 150
155 fs := authtest.FakeState{} 151 fs := authtest.FakeState{}
156 c = auth.WithState(c, &fs) 152 c = auth.WithState(c, &fs)
157 153
158 » » c = ct.UseConfig(c, &svcconfig.Coordinator{ 154 » » ms := memoryStorage.Storage{}
159 » » » AdminAuthGroup: "test-administrators", 155 » » gsc := testGSClient{}
160 » » }) 156 » » svcStub := ct.Services{
157 » » » IS: func() (storage.Storage, error) {
158 » » » » return &ms, nil
159 » » » },
160 » » » GS: func() (gs.Client, error) {
161 » » » » return gsc, nil
162 » » » },
163 » » }
164 » » svcStub.InitConfig()
165 » » svcStub.ServiceConfig.Coordinator.AdminAuthGroup = "test-adminis trators"
166
167 » » s := Server{
168 » » » ServiceBase: coordinator.ServiceBase{&svcStub},
169 » » }
161 170
162 // Generate our test stream. 171 // Generate our test stream.
163 desc := ct.TestLogStreamDescriptor(c, "foo/bar") 172 desc := ct.TestLogStreamDescriptor(c, "foo/bar")
164 ls := ct.TestLogStream(c, desc) 173 ls := ct.TestLogStream(c, desc)
165 » » if err := ls.Put(ds.Get(c)); err != nil { 174 » » if err := ds.Get(c).Put(ls); err != nil {
166 panic(err) 175 panic(err)
167 } 176 }
168 177
169 tc.Add(time.Second) 178 tc.Add(time.Second)
170 var entries []*logpb.LogEntry 179 var entries []*logpb.LogEntry
171 protobufs := map[uint64][]byte{} 180 protobufs := map[uint64][]byte{}
172 for _, v := range []int{0, 1, 2, 4, 5, 7} { 181 for _, v := range []int{0, 1, 2, 4, 5, 7} {
173 le := ct.TestLogEntry(c, ls, v) 182 le := ct.TestLogEntry(c, ls, v)
174 le.GetText().Lines = append(le.GetText().Lines, &logpb.T ext_Line{ 183 le.GetText().Lines = append(le.GetText().Lines, &logpb.T ext_Line{
175 Value: "another line of text", 184 Value: "another line of text",
(...skipping 21 matching lines...) Expand all
197 } 206 }
198 } 207 }
199 208
200 d, err := proto.Marshal(le) 209 d, err := proto.Marshal(le)
201 if err != nil { 210 if err != nil {
202 panic(err) 211 panic(err)
203 } 212 }
204 protobufs[uint64(v)] = d 213 protobufs[uint64(v)] = d
205 } 214 }
206 215
207 // Define and populate our Storage.
208 s := Server{}
209
210 ms := memoryStorage.Storage{}
211 gsc := testGSClient{}
212 s.StorageFunc = func(context.Context) (storage.Storage, error) {
213 return &ms, nil
214 }
215 s.GSClientFunc = func(context.Context) (gs.Client, error) {
216 return gsc, nil
217 }
218
219 Convey(`Testing Get requests (no logs)`, func() { 216 Convey(`Testing Get requests (no logs)`, func() {
220 req := logdog.GetRequest{ 217 req := logdog.GetRequest{
221 Path: string(ls.Path()), 218 Path: string(ls.Path()),
222 } 219 }
223 220
224 Convey(`Will fail if the Path is not a stream path or a hash.`, func() { 221 Convey(`Will fail if the Path is not a stream path or a hash.`, func() {
225 req.Path = "not/a/full/stream/path" 222 req.Path = "not/a/full/stream/path"
226 _, err := s.Get(c, &req) 223 _, err := s.Get(c, &req)
227 So(err, ShouldErrLike, "invalid path value") 224 So(err, ShouldErrLike, "invalid path value")
228 }) 225 })
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
273 Desc: desc, 270 Desc: desc,
274 Source: &src, 271 Source: &src,
275 LogWriter: &lbuf, 272 LogWriter: &lbuf,
276 IndexWriter: &ibuf, 273 IndexWriter: &ibuf,
277 StreamIndexRange: 2, 274 StreamIndexRange: 2,
278 } 275 }
279 if err := archive.Archive(m); err != nil { 276 if err := archive.Archive(m); err != nil {
280 panic(err) 277 panic(err)
281 } 278 }
282 279
283 » » » » » gsc.put("testbucket", "stream", lbuf.Byt es()) 280 » » » » » now := tc.Now().UTC()
284 » » » » » gsc.put("testbucket", "index", ibuf.Byte s()) 281
282 » » » » » gsc.put("gs://testbucket/stream", lbuf.B ytes())
283 » » » » » gsc.put("gs://testbucket/index", ibuf.By tes())
285 ls.State = coordinator.LSArchived 284 ls.State = coordinator.LSArchived
285 ls.TerminatedTime = now
286 ls.ArchivedTime = now
286 ls.ArchiveStreamURL = "gs://testbucket/s tream" 287 ls.ArchiveStreamURL = "gs://testbucket/s tream"
287 ls.ArchiveIndexURL = "gs://testbucket/in dex" 288 ls.ArchiveIndexURL = "gs://testbucket/in dex"
288 } 289 }
289 » » » » if err := ls.Put(ds.Get(c)); err != nil { 290 » » » » if err := ds.Get(c).Put(ls); err != nil {
290 panic(err) 291 panic(err)
291 } 292 }
292 293
293 Convey(`Testing Get requests`, func() { 294 Convey(`Testing Get requests`, func() {
294 req := logdog.GetRequest{ 295 req := logdog.GetRequest{
295 Path: string(ls.Path()), 296 Path: string(ls.Path()),
296 } 297 }
297 298
298 Convey(`When the log stream is purged`, func() { 299 Convey(`When the log stream is purged`, func() {
299 ls.Purged = true 300 ls.Purged = true
300 » » » » » » if err := ls.Put(ds.Get(c)); err != nil { 301 » » » » » » if err := ds.Get(c).Put(ls); err != nil {
301 panic(err) 302 panic(err)
302 } 303 }
303 304
304 Convey(`Will return NotFound if the user is not an administrator.`, func() { 305 Convey(`Will return NotFound if the user is not an administrator.`, func() {
305 _, err := s.Get(c, &req) 306 _, err := s.Get(c, &req)
306 So(err, ShouldBeRPCNotFo und) 307 So(err, ShouldBeRPCNotFo und)
307 }) 308 })
308 309
309 Convey(`Will process the request if the user is an administrator.`, func() { 310 Convey(`Will process the request if the user is an administrator.`, func() {
310 fs.IdentityGroups = []st ring{"test-administrators"} 311 fs.IdentityGroups = []st ring{"test-administrators"}
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after
397 398
398 Convey(`With a byte limit of sizeof({0, 1, 2})+1, will return log entries {0, 1, 2}.`, func() { 399 Convey(`With a byte limit of sizeof({0, 1, 2})+1, will return log entries {0, 1, 2}.`, func() {
399 req.ByteCount = int32(len(protob ufs[0]) + len(protobufs[1]) + len(protobufs[2]) + 1) 400 req.ByteCount = int32(len(protob ufs[0]) + len(protobufs[1]) + len(protobufs[2]) + 1)
400 401
401 resp, err := s.Get(c, &req) 402 resp, err := s.Get(c, &req)
402 So(err, ShouldBeRPCOK) 403 So(err, ShouldBeRPCOK)
403 So(resp, shouldHaveLogs, 0, 1, 2 ) 404 So(resp, shouldHaveLogs, 0, 1, 2 )
404 }) 405 })
405 406
406 Convey(`Will successfully retrieve a str eam path hash.`, func() { 407 Convey(`Will successfully retrieve a str eam path hash.`, func() {
407 » » » » » » req.Path = ls.HashID() 408 » » » » » » req.Path = ls.HashID
408 resp, err := s.Get(c, &req) 409 resp, err := s.Get(c, &req)
409 So(err, ShouldBeRPCOK) 410 So(err, ShouldBeRPCOK)
410 So(resp, shouldHaveLogs, 0, 1, 2 ) 411 So(resp, shouldHaveLogs, 0, 1, 2 )
411 }) 412 })
412 413
413 Convey(`When requesting state`, func() { 414 Convey(`When requesting state`, func() {
414 req.State = true 415 req.State = true
415 req.LogCount = -1 416 req.LogCount = -1
416 417
417 Convey(`Will successfully retrie ve stream state.`, func() { 418 Convey(`Will successfully retrie ve stream state.`, func() {
418 resp, err := s.Get(c, &r eq) 419 resp, err := s.Get(c, &r eq)
419 So(err, ShouldBeRPCOK) 420 So(err, ShouldBeRPCOK)
420 So(resp.State, ShouldRes emble, loadLogStreamState(ls)) 421 So(resp.State, ShouldRes emble, loadLogStreamState(ls))
421 So(len(resp.Logs), Shoul dEqual, 0) 422 So(len(resp.Logs), Shoul dEqual, 0)
422 }) 423 })
423 424
424 Convey(`Will return Internal if the protobuf descriptor data is corrupt.`, func() { 425 Convey(`Will return Internal if the protobuf descriptor data is corrupt.`, func() {
425 » » » » » » » // We can't use "ls.Put" here because it validates the protobuf! 426 » » » » » » » ls.SetDSValidate(false)
426 ls.Descriptor = []byte{0 x00} // Invalid protobuf, zero tag. 427 ls.Descriptor = []byte{0 x00} // Invalid protobuf, zero tag.
427 if err := ds.Get(c).Put( ls); err != nil { 428 if err := ds.Get(c).Put( ls); err != nil {
428 panic(err) 429 panic(err)
429 } 430 }
430 431
431 _, err := s.Get(c, &req) 432 _, err := s.Get(c, &req)
432 So(err, ShouldBeRPCInter nal) 433 So(err, ShouldBeRPCInter nal)
433 }) 434 })
434 }) 435 })
435 436
436 Convey(`Will return Internal if the prot obuf log entry data is corrupt.`, func() { 437 Convey(`Will return Internal if the prot obuf log entry data is corrupt.`, func() {
437 if v { 438 if v {
438 // Corrupt the archive d atastream. 439 // Corrupt the archive d atastream.
439 » » » » » » » stream := gsc.get("testb ucket", "stream") 440 » » » » » » » stream := gsc.get("gs:// testbucket/stream")
440 zeroRecords(stream) 441 zeroRecords(stream)
441 } else { 442 } else {
442 // Add corrupted entry t o Storage. Create a new entry here, since 443 // Add corrupted entry t o Storage. Create a new entry here, since
443 // the storage will reje ct a duplicate/overwrite. 444 // the storage will reje ct a duplicate/overwrite.
444 err := ms.Put(storage.Pu tRequest{ 445 err := ms.Put(storage.Pu tRequest{
445 Path: types.St reamPath(req.Path), 446 Path: types.St reamPath(req.Path),
446 Index: 666, 447 Index: 666,
447 Values: [][]byte {{0x00}}, // Invalid protobuf, zero tag. 448 Values: [][]byte {{0x00}}, // Invalid protobuf, zero tag.
448 }) 449 })
449 if err != nil { 450 if err != nil {
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after
556 Path: string(ls.Path()), 557 Path: string(ls.Path()),
557 } 558 }
558 559
559 Convey(`Will successfully retrieve a str eam path.`, func() { 560 Convey(`Will successfully retrieve a str eam path.`, func() {
560 resp, err := s.Tail(c, &req) 561 resp, err := s.Tail(c, &req)
561 So(err, ShouldBeRPCOK) 562 So(err, ShouldBeRPCOK)
562 So(resp, shouldHaveLogs, 7) 563 So(resp, shouldHaveLogs, 7)
563 }) 564 })
564 565
565 Convey(`Will successfully retrieve a str eam path hash and state.`, func() { 566 Convey(`Will successfully retrieve a str eam path hash and state.`, func() {
566 » » » » » » req.Path = ls.HashID() 567 » » » » » » req.Path = ls.HashID
567 req.State = true 568 req.State = true
568 569
569 resp, err := s.Tail(c, &req) 570 resp, err := s.Tail(c, &req)
570 So(err, ShouldBeRPCOK) 571 So(err, ShouldBeRPCOK)
571 So(resp, shouldHaveLogs, 7) 572 So(resp, shouldHaveLogs, 7)
572 So(resp.State, ShouldResemble, l oadLogStreamState(ls)) 573 So(resp.State, ShouldResemble, l oadLogStreamState(ls))
573 }) 574 })
574 }) 575 })
575 }) 576 })
576 } 577 }
577 }) 578 })
578 } 579 }
OLDNEW
« no previous file with comments | « appengine/logdog/coordinator/endpoints/logs/get.go ('k') | appengine/logdog/coordinator/endpoints/logs/list.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698