Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(430)

Side by Side Diff: logdog/appengine/coordinator/endpoints/logs/get_test.go

Issue 2538203002: LogDog: Add signed GS URL fetching. (Closed)
Patch Set: Allow index signing, use gaesigner. Created 4 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package logs 5 package logs
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "errors" 9 "errors"
10 "fmt" 10 "fmt"
11 "math" 11 "math"
12 "testing" 12 "testing"
13 "time" 13 "time"
14 14
15 "github.com/luci/luci-go/common/clock"
15 "github.com/luci/luci-go/common/config" 16 "github.com/luci/luci-go/common/config"
16 "github.com/luci/luci-go/common/data/recordio" 17 "github.com/luci/luci-go/common/data/recordio"
17 "github.com/luci/luci-go/common/iotools" 18 "github.com/luci/luci-go/common/iotools"
19 "github.com/luci/luci-go/common/proto/google"
18 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" 20 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1"
19 "github.com/luci/luci-go/logdog/api/logpb" 21 "github.com/luci/luci-go/logdog/api/logpb"
20 ct "github.com/luci/luci-go/logdog/appengine/coordinator/coordinatorTest " 22 ct "github.com/luci/luci-go/logdog/appengine/coordinator/coordinatorTest "
21 "github.com/luci/luci-go/logdog/common/archive" 23 "github.com/luci/luci-go/logdog/common/archive"
22 "github.com/luci/luci-go/logdog/common/renderer" 24 "github.com/luci/luci-go/logdog/common/renderer"
23 "github.com/luci/luci-go/logdog/common/storage" 25 "github.com/luci/luci-go/logdog/common/storage"
24 "github.com/luci/luci-go/logdog/common/storage/bigtable"
25 "github.com/luci/luci-go/logdog/common/types" 26 "github.com/luci/luci-go/logdog/common/types"
26 27
27 "github.com/luci/gae/filter/featureBreaker" 28 "github.com/luci/gae/filter/featureBreaker"
28 29
29 "github.com/golang/protobuf/proto" 30 "github.com/golang/protobuf/proto"
30 "golang.org/x/net/context" 31 "golang.org/x/net/context"
31 32
32 . "github.com/luci/luci-go/common/testing/assertions" 33 . "github.com/luci/luci-go/common/testing/assertions"
33 . "github.com/smartystreets/goconvey/convey" 34 . "github.com/smartystreets/goconvey/convey"
34 ) 35 )
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
73 trash.ReadFrom(r) 74 trash.ReadFrom(r)
74 } 75 }
75 } 76 }
76 77
77 func testGetImpl(t *testing.T, archived bool) { 78 func testGetImpl(t *testing.T, archived bool) {
78 Convey(fmt.Sprintf(`With a testing configuration, a Get request (archive d=%v)`, archived), t, func() { 79 Convey(fmt.Sprintf(`With a testing configuration, a Get request (archive d=%v)`, archived), t, func() {
79 c, env := ct.Install() 80 c, env := ct.Install()
80 81
81 svr := New() 82 svr := New()
82 83
83 // Use an actual BigTable testing instance for our intermediate storage,
84 // instead of the default in-memory instance.
85 is := bigtable.NewMemoryInstance(c, bigtable.Options{
86 Cache: env.Services.StorageCache(),
87 })
88 env.Services.IS = func() (storage.Storage, error) { return is, n il }
89
90 // di is a datastore bound to the test project namespace. 84 // di is a datastore bound to the test project namespace.
91 const project = config.ProjectName("proj-foo") 85 const project = config.ProjectName("proj-foo")
92 86
93 // Generate our test stream. 87 // Generate our test stream.
94 tls := ct.MakeStream(c, "proj-foo", "testing/+/foo/bar") 88 tls := ct.MakeStream(c, "proj-foo", "testing/+/foo/bar")
95 89
96 putLogStream := func(c context.Context) { 90 putLogStream := func(c context.Context) {
97 if err := tls.Put(c); err != nil { 91 if err := tls.Put(c); err != nil {
98 panic(err) 92 panic(err)
99 } 93 }
(...skipping 192 matching lines...) Expand 10 before | Expand all | Expand 10 after
292 _, err := svr.Tail(c, &req) 286 _, err := svr.Tail(c, &req)
293 So(err, ShouldBeRPCNotFound) 287 So(err, ShouldBeRPCNotFound)
294 }) 288 })
295 }) 289 })
296 290
297 Convey(`When testing log data is added`, func() { 291 Convey(`When testing log data is added`, func() {
298 putLogData := func() { 292 putLogData := func() {
299 if !archived { 293 if !archived {
300 // Add the logs to the in-memory tempora ry storage. 294 // Add the logs to the in-memory tempora ry storage.
301 for _, le := range entries { 295 for _, le := range entries {
302 » » » » » » err := is.Put(storage.PutRequest { 296 » » » » » » err := env.BigTable.Put(storage. PutRequest{
303 Project: project, 297 Project: project,
304 Path: tls.Path, 298 Path: tls.Path,
305 Index: types.MessageIn dex(le.StreamIndex), 299 Index: types.MessageIn dex(le.StreamIndex),
306 Values: [][]byte{protob ufs[le.StreamIndex]}, 300 Values: [][]byte{protob ufs[le.StreamIndex]},
307 }) 301 })
308 if err != nil { 302 if err != nil {
309 panic(fmt.Errorf("failed to Put() LogEntry: %v", err)) 303 panic(fmt.Errorf("failed to Put() LogEntry: %v", err))
310 } 304 }
311 } 305 }
312 } else { 306 } else {
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after
372 So(err, ShouldBeRPCOK) 366 So(err, ShouldBeRPCOK)
373 So(resp.Logs, ShouldHaveLength, 0) 367 So(resp.Logs, ShouldHaveLength, 0)
374 }) 368 })
375 369
376 Convey(`Will successfully retrieve a stream path .`, func() { 370 Convey(`Will successfully retrieve a stream path .`, func() {
377 resp, err := svr.Get(c, &req) 371 resp, err := svr.Get(c, &req)
378 So(err, ShouldBeRPCOK) 372 So(err, ShouldBeRPCOK)
379 So(resp, shouldHaveLogs, 0, 1, 2) 373 So(resp, shouldHaveLogs, 0, 1, 2)
380 374
381 Convey(`Will successfully retrieve the s tream path again (caching).`, func() { 375 Convey(`Will successfully retrieve the s tream path again (caching).`, func() {
382 // Re-put the log data, since th e previous request closed the
383 // storage instance.
384 putLogData()
385
386 resp, err := svr.Get(c, &req) 376 resp, err := svr.Get(c, &req)
387 So(err, ShouldBeRPCOK) 377 So(err, ShouldBeRPCOK)
388 So(resp, shouldHaveLogs, 0, 1, 2 ) 378 So(resp, shouldHaveLogs, 0, 1, 2 )
389 }) 379 })
390 }) 380 })
391 381
392 Convey(`Will successfully retrieve a stream path offset at 4.`, func() { 382 Convey(`Will successfully retrieve a stream path offset at 4.`, func() {
393 req.Index = 4 383 req.Index = 4
394 384
395 resp, err := svr.Get(c, &req) 385 resp, err := svr.Get(c, &req)
(...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after
476 Convey(`Will return Internal if the prot obuf descriptor data is corrupt.`, func() { 466 Convey(`Will return Internal if the prot obuf descriptor data is corrupt.`, func() {
477 tls.Stream.SetDSValidate(false) 467 tls.Stream.SetDSValidate(false)
478 tls.Stream.Descriptor = []byte{0 x00} // Invalid protobuf, zero tag. 468 tls.Stream.Descriptor = []byte{0 x00} // Invalid protobuf, zero tag.
479 putLogStream(c) 469 putLogStream(c)
480 470
481 _, err := svr.Get(c, &req) 471 _, err := svr.Get(c, &req)
482 So(err, ShouldBeRPCInternal) 472 So(err, ShouldBeRPCInternal)
483 }) 473 })
484 }) 474 })
485 475
476 Convey(`When requesting a signed URL`, func() {
477 const duration = 10 * time.Hour
478 req.LogCount = -1
479
480 sr := logdog.GetRequest_SignURLRequest{
481 Lifetime: google.NewDuration(dur ation),
482 Stream: true,
483 Index: true,
484 }
485 req.GetSignedUrls = &sr
486
487 if archived {
488 Convey(`Will successfully retrie ve the URL.`, func() {
489 resp, err := svr.Get(c, &req)
490 So(err, ShouldBeNil)
491 So(resp.Logs, ShouldHave Length, 0)
492
493 So(resp.SignedUrls, Shou ldNotBeNil)
494 So(resp.SignedUrls.Strea m, ShouldEndWith, "&signed=true")
495 So(resp.SignedUrls.Index , ShouldEndWith, "&signed=true")
496 So(resp.SignedUrls.Expir ation.Time(), ShouldResemble, clock.Now(c).Add(duration))
497 })
498 } else {
499 Convey(`Will succeed, but return no URL.`, func() {
500 resp, err := svr.Get(c, &req)
501 So(err, ShouldBeNil)
502 So(resp.Logs, ShouldHave Length, 0)
503 So(resp.SignedUrls, Shou ldBeNil)
504 })
505 }
506 })
507
486 Convey(`Will return Internal if the protobuf log entry data is corrupt.`, func() { 508 Convey(`Will return Internal if the protobuf log entry data is corrupt.`, func() {
487 if archived { 509 if archived {
488 // Corrupt the archive datastrea m. 510 // Corrupt the archive datastrea m.
489 stream := env.GSClient.Get("gs:/ /testbucket/stream") 511 stream := env.GSClient.Get("gs:/ /testbucket/stream")
490 zeroRecords(stream) 512 zeroRecords(stream)
491 } else { 513 } else {
492 // Add corrupted entry to Storag e. Create a new entry here, since 514 // Add corrupted entry to Storag e. Create a new entry here, since
493 // the storage will reject a dup licate/overwrite. 515 // the storage will reject a dup licate/overwrite.
494 » » » » » » err := is.Put(storage.PutRequest { 516 » » » » » » err := env.BigTable.Put(storage. PutRequest{
495 Project: project, 517 Project: project,
496 Path: types.StreamPat h(req.Path), 518 Path: types.StreamPat h(req.Path),
497 Index: 666, 519 Index: 666,
498 Values: [][]byte{{0x00} }, // Invalid protobuf, zero tag. 520 Values: [][]byte{{0x00} }, // Invalid protobuf, zero tag.
499 }) 521 })
500 if err != nil { 522 if err != nil {
501 panic(err) 523 panic(err)
502 } 524 }
503 req.Index = 666 525 req.Index = 666
504 } 526 }
505 527
506 _, err := svr.Get(c, &req) 528 _, err := svr.Get(c, &req)
507 So(err, ShouldBeRPCInternal) 529 So(err, ShouldBeRPCInternal)
508 }) 530 })
509 531
510 Convey(`Will successfully retrieve both logs and stream state.`, func() { 532 Convey(`Will successfully retrieve both logs and stream state.`, func() {
511 req.State = true 533 req.State = true
512 534
513 resp, err := svr.Get(c, &req) 535 resp, err := svr.Get(c, &req)
514 So(err, ShouldBeRPCOK) 536 So(err, ShouldBeRPCOK)
515 So(resp.State, ShouldResemble, buildLogS treamState(tls.Stream, tls.State)) 537 So(resp.State, ShouldResemble, buildLogS treamState(tls.Stream, tls.State))
516 So(resp, shouldHaveLogs, 0, 1, 2) 538 So(resp, shouldHaveLogs, 0, 1, 2)
517 }) 539 })
518 540
519 Convey(`Will return Internal if the Storage is n ot working.`, func() { 541 Convey(`Will return Internal if the Storage is n ot working.`, func() {
520 if archived { 542 if archived {
521 env.GSClient["error"] = []byte(" test error") 543 env.GSClient["error"] = []byte(" test error")
522 } else { 544 } else {
523 » » » » » » is.SetErr(errors.New("not workin g")) 545 » » » » » » env.BigTable.SetErr(errors.New(" not working"))
524 } 546 }
525 547
526 _, err := svr.Get(c, &req) 548 _, err := svr.Get(c, &req)
527 So(err, ShouldBeRPCInternal) 549 So(err, ShouldBeRPCInternal)
528 }) 550 })
529 551
530 Convey(`Will enforce a maximum count of 2.`, fun c() { 552 Convey(`Will enforce a maximum count of 2.`, fun c() {
531 req.LogCount = 2 553 req.LogCount = 2
532 resp, err := svr.Get(c, &req) 554 resp, err := svr.Get(c, &req)
533 So(err, ShouldBeRPCOK) 555 So(err, ShouldBeRPCOK)
(...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after
613 resp, err := svr.Tail(c, &req) 635 resp, err := svr.Tail(c, &req)
614 So(err, ShouldBeRPCOK) 636 So(err, ShouldBeRPCOK)
615 So(resp, shouldHaveLogs, 7) 637 So(resp, shouldHaveLogs, 7)
616 So(resp.State, ShouldResemble, buildLogS treamState(tls.Stream, tls.State)) 638 So(resp.State, ShouldResemble, buildLogS treamState(tls.Stream, tls.State))
617 639
618 // For non-archival: 1 miss and 1 put, f or the tail row. 640 // For non-archival: 1 miss and 1 put, f or the tail row.
619 // For archival: 1 miss and 1 put, for t he index. 641 // For archival: 1 miss and 1 put, for t he index.
620 So(env.StorageCache.Stats(), ShouldResem ble, ct.StorageCacheStats{Puts: 1, Misses: 1}) 642 So(env.StorageCache.Stats(), ShouldResem ble, ct.StorageCacheStats{Puts: 1, Misses: 1})
621 643
622 Convey(`Will retrieve the stream path ag ain (caching).`, func() { 644 Convey(`Will retrieve the stream path ag ain (caching).`, func() {
623 // Re-put the log data, since th e previous request closed the
624 // storage instance.
625 putLogData()
626 env.StorageCache.Clear() 645 env.StorageCache.Clear()
627 646
628 resp, err := svr.Tail(c, &req) 647 resp, err := svr.Tail(c, &req)
629 So(err, ShouldBeRPCOK) 648 So(err, ShouldBeRPCOK)
630 So(resp, shouldHaveLogs, 7) 649 So(resp, shouldHaveLogs, 7)
631 So(resp.State, ShouldResemble, b uildLogStreamState(tls.Stream, tls.State)) 650 So(resp.State, ShouldResemble, b uildLogStreamState(tls.Stream, tls.State))
632 651
633 // For non-archival: 1 hit, for the tail row. 652 // For non-archival: 1 hit, for the tail row.
634 // For archival: 1 hit, for the index. 653 // For archival: 1 hit, for the index.
635 So(env.StorageCache.Stats(), Sho uldResemble, ct.StorageCacheStats{Hits: 1}) 654 So(env.StorageCache.Stats(), Sho uldResemble, ct.StorageCacheStats{Hits: 1})
636 }) 655 })
637 }) 656 })
638 }) 657 })
639 }) 658 })
640 }) 659 })
641 } 660 }
642 661
643 func TestGetIntermediate(t *testing.T) { 662 func TestGetIntermediate(t *testing.T) {
644 t.Parallel() 663 t.Parallel()
645 664
646 testGetImpl(t, false) 665 testGetImpl(t, false)
647 } 666 }
648 667
649 func TestGetArchived(t *testing.T) { 668 func TestGetArchived(t *testing.T) {
650 t.Parallel() 669 t.Parallel()
651 670
652 testGetImpl(t, true) 671 testGetImpl(t, true)
653 } 672 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698