| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package logs | 5 package logs |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "fmt" | 10 "fmt" |
| 11 "math" | 11 "math" |
| 12 "testing" | 12 "testing" |
| 13 "time" | 13 "time" |
| 14 | 14 |
| 15 "github.com/luci/luci-go/common/clock" |
| 15 "github.com/luci/luci-go/common/config" | 16 "github.com/luci/luci-go/common/config" |
| 16 "github.com/luci/luci-go/common/data/recordio" | 17 "github.com/luci/luci-go/common/data/recordio" |
| 17 "github.com/luci/luci-go/common/iotools" | 18 "github.com/luci/luci-go/common/iotools" |
| 19 "github.com/luci/luci-go/common/proto/google" |
| 18 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" | 20 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" |
| 19 "github.com/luci/luci-go/logdog/api/logpb" | 21 "github.com/luci/luci-go/logdog/api/logpb" |
| 20 ct "github.com/luci/luci-go/logdog/appengine/coordinator/coordinatorTest
" | 22 ct "github.com/luci/luci-go/logdog/appengine/coordinator/coordinatorTest
" |
| 21 "github.com/luci/luci-go/logdog/common/archive" | 23 "github.com/luci/luci-go/logdog/common/archive" |
| 22 "github.com/luci/luci-go/logdog/common/renderer" | 24 "github.com/luci/luci-go/logdog/common/renderer" |
| 23 "github.com/luci/luci-go/logdog/common/storage" | 25 "github.com/luci/luci-go/logdog/common/storage" |
| 24 "github.com/luci/luci-go/logdog/common/storage/bigtable" | |
| 25 "github.com/luci/luci-go/logdog/common/types" | 26 "github.com/luci/luci-go/logdog/common/types" |
| 26 | 27 |
| 27 "github.com/luci/gae/filter/featureBreaker" | 28 "github.com/luci/gae/filter/featureBreaker" |
| 28 | 29 |
| 29 "github.com/golang/protobuf/proto" | 30 "github.com/golang/protobuf/proto" |
| 30 "golang.org/x/net/context" | 31 "golang.org/x/net/context" |
| 31 | 32 |
| 32 . "github.com/luci/luci-go/common/testing/assertions" | 33 . "github.com/luci/luci-go/common/testing/assertions" |
| 33 . "github.com/smartystreets/goconvey/convey" | 34 . "github.com/smartystreets/goconvey/convey" |
| 34 ) | 35 ) |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 73 trash.ReadFrom(r) | 74 trash.ReadFrom(r) |
| 74 } | 75 } |
| 75 } | 76 } |
| 76 | 77 |
| 77 func testGetImpl(t *testing.T, archived bool) { | 78 func testGetImpl(t *testing.T, archived bool) { |
| 78 Convey(fmt.Sprintf(`With a testing configuration, a Get request (archive
d=%v)`, archived), t, func() { | 79 Convey(fmt.Sprintf(`With a testing configuration, a Get request (archive
d=%v)`, archived), t, func() { |
| 79 c, env := ct.Install() | 80 c, env := ct.Install() |
| 80 | 81 |
| 81 svr := New() | 82 svr := New() |
| 82 | 83 |
| 83 // Use an actual BigTable testing instance for our intermediate
storage, | |
| 84 // instead of the default in-memory instance. | |
| 85 is := bigtable.NewMemoryInstance(c, bigtable.Options{ | |
| 86 Cache: env.Services.StorageCache(), | |
| 87 }) | |
| 88 env.Services.IS = func() (storage.Storage, error) { return is, n
il } | |
| 89 | |
| 90 // di is a datastore bound to the test project namespace. | 84 // di is a datastore bound to the test project namespace. |
| 91 const project = config.ProjectName("proj-foo") | 85 const project = config.ProjectName("proj-foo") |
| 92 | 86 |
| 93 // Generate our test stream. | 87 // Generate our test stream. |
| 94 tls := ct.MakeStream(c, "proj-foo", "testing/+/foo/bar") | 88 tls := ct.MakeStream(c, "proj-foo", "testing/+/foo/bar") |
| 95 | 89 |
| 96 putLogStream := func(c context.Context) { | 90 putLogStream := func(c context.Context) { |
| 97 if err := tls.Put(c); err != nil { | 91 if err := tls.Put(c); err != nil { |
| 98 panic(err) | 92 panic(err) |
| 99 } | 93 } |
| (...skipping 192 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 292 _, err := svr.Tail(c, &req) | 286 _, err := svr.Tail(c, &req) |
| 293 So(err, ShouldBeRPCNotFound) | 287 So(err, ShouldBeRPCNotFound) |
| 294 }) | 288 }) |
| 295 }) | 289 }) |
| 296 | 290 |
| 297 Convey(`When testing log data is added`, func() { | 291 Convey(`When testing log data is added`, func() { |
| 298 putLogData := func() { | 292 putLogData := func() { |
| 299 if !archived { | 293 if !archived { |
| 300 // Add the logs to the in-memory tempora
ry storage. | 294 // Add the logs to the in-memory tempora
ry storage. |
| 301 for _, le := range entries { | 295 for _, le := range entries { |
| 302 » » » » » » err := is.Put(storage.PutRequest
{ | 296 » » » » » » err := env.BigTable.Put(storage.
PutRequest{ |
| 303 Project: project, | 297 Project: project, |
| 304 Path: tls.Path, | 298 Path: tls.Path, |
| 305 Index: types.MessageIn
dex(le.StreamIndex), | 299 Index: types.MessageIn
dex(le.StreamIndex), |
| 306 Values: [][]byte{protob
ufs[le.StreamIndex]}, | 300 Values: [][]byte{protob
ufs[le.StreamIndex]}, |
| 307 }) | 301 }) |
| 308 if err != nil { | 302 if err != nil { |
| 309 panic(fmt.Errorf("failed
to Put() LogEntry: %v", err)) | 303 panic(fmt.Errorf("failed
to Put() LogEntry: %v", err)) |
| 310 } | 304 } |
| 311 } | 305 } |
| 312 } else { | 306 } else { |
| (...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 372 So(err, ShouldBeRPCOK) | 366 So(err, ShouldBeRPCOK) |
| 373 So(resp.Logs, ShouldHaveLength, 0) | 367 So(resp.Logs, ShouldHaveLength, 0) |
| 374 }) | 368 }) |
| 375 | 369 |
| 376 Convey(`Will successfully retrieve a stream path
.`, func() { | 370 Convey(`Will successfully retrieve a stream path
.`, func() { |
| 377 resp, err := svr.Get(c, &req) | 371 resp, err := svr.Get(c, &req) |
| 378 So(err, ShouldBeRPCOK) | 372 So(err, ShouldBeRPCOK) |
| 379 So(resp, shouldHaveLogs, 0, 1, 2) | 373 So(resp, shouldHaveLogs, 0, 1, 2) |
| 380 | 374 |
| 381 Convey(`Will successfully retrieve the s
tream path again (caching).`, func() { | 375 Convey(`Will successfully retrieve the s
tream path again (caching).`, func() { |
| 382 // Re-put the log data, since th
e previous request closed the | |
| 383 // storage instance. | |
| 384 putLogData() | |
| 385 | |
| 386 resp, err := svr.Get(c, &req) | 376 resp, err := svr.Get(c, &req) |
| 387 So(err, ShouldBeRPCOK) | 377 So(err, ShouldBeRPCOK) |
| 388 So(resp, shouldHaveLogs, 0, 1, 2
) | 378 So(resp, shouldHaveLogs, 0, 1, 2
) |
| 389 }) | 379 }) |
| 390 }) | 380 }) |
| 391 | 381 |
| 392 Convey(`Will successfully retrieve a stream path
offset at 4.`, func() { | 382 Convey(`Will successfully retrieve a stream path
offset at 4.`, func() { |
| 393 req.Index = 4 | 383 req.Index = 4 |
| 394 | 384 |
| 395 resp, err := svr.Get(c, &req) | 385 resp, err := svr.Get(c, &req) |
| (...skipping 80 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 476 Convey(`Will return Internal if the prot
obuf descriptor data is corrupt.`, func() { | 466 Convey(`Will return Internal if the prot
obuf descriptor data is corrupt.`, func() { |
| 477 tls.Stream.SetDSValidate(false) | 467 tls.Stream.SetDSValidate(false) |
| 478 tls.Stream.Descriptor = []byte{0
x00} // Invalid protobuf, zero tag. | 468 tls.Stream.Descriptor = []byte{0
x00} // Invalid protobuf, zero tag. |
| 479 putLogStream(c) | 469 putLogStream(c) |
| 480 | 470 |
| 481 _, err := svr.Get(c, &req) | 471 _, err := svr.Get(c, &req) |
| 482 So(err, ShouldBeRPCInternal) | 472 So(err, ShouldBeRPCInternal) |
| 483 }) | 473 }) |
| 484 }) | 474 }) |
| 485 | 475 |
| 476 Convey(`When requesting a signed URL`, func() { |
| 477 const duration = 10 * time.Hour |
| 478 req.LogCount = -1 |
| 479 |
| 480 sr := logdog.GetRequest_SignURLRequest{ |
| 481 Lifetime: google.NewDuration(dur
ation), |
| 482 Stream: true, |
| 483 Index: true, |
| 484 } |
| 485 req.GetSignedUrls = &sr |
| 486 |
| 487 if archived { |
| 488 Convey(`Will successfully retrie
ve the URL.`, func() { |
| 489 resp, err := svr.Get(c,
&req) |
| 490 So(err, ShouldBeNil) |
| 491 So(resp.Logs, ShouldHave
Length, 0) |
| 492 |
| 493 So(resp.SignedUrls, Shou
ldNotBeNil) |
| 494 So(resp.SignedUrls.Strea
m, ShouldEndWith, "&signed=true") |
| 495 So(resp.SignedUrls.Index
, ShouldEndWith, "&signed=true") |
| 496 So(resp.SignedUrls.Expir
ation.Time(), ShouldResemble, clock.Now(c).Add(duration)) |
| 497 }) |
| 498 } else { |
| 499 Convey(`Will succeed, but return
no URL.`, func() { |
| 500 resp, err := svr.Get(c,
&req) |
| 501 So(err, ShouldBeNil) |
| 502 So(resp.Logs, ShouldHave
Length, 0) |
| 503 So(resp.SignedUrls, Shou
ldBeNil) |
| 504 }) |
| 505 } |
| 506 }) |
| 507 |
| 486 Convey(`Will return Internal if the protobuf log
entry data is corrupt.`, func() { | 508 Convey(`Will return Internal if the protobuf log
entry data is corrupt.`, func() { |
| 487 if archived { | 509 if archived { |
| 488 // Corrupt the archive datastrea
m. | 510 // Corrupt the archive datastrea
m. |
| 489 stream := env.GSClient.Get("gs:/
/testbucket/stream") | 511 stream := env.GSClient.Get("gs:/
/testbucket/stream") |
| 490 zeroRecords(stream) | 512 zeroRecords(stream) |
| 491 } else { | 513 } else { |
| 492 // Add corrupted entry to Storag
e. Create a new entry here, since | 514 // Add corrupted entry to Storag
e. Create a new entry here, since |
| 493 // the storage will reject a dup
licate/overwrite. | 515 // the storage will reject a dup
licate/overwrite. |
| 494 » » » » » » err := is.Put(storage.PutRequest
{ | 516 » » » » » » err := env.BigTable.Put(storage.
PutRequest{ |
| 495 Project: project, | 517 Project: project, |
| 496 Path: types.StreamPat
h(req.Path), | 518 Path: types.StreamPat
h(req.Path), |
| 497 Index: 666, | 519 Index: 666, |
| 498 Values: [][]byte{{0x00}
}, // Invalid protobuf, zero tag. | 520 Values: [][]byte{{0x00}
}, // Invalid protobuf, zero tag. |
| 499 }) | 521 }) |
| 500 if err != nil { | 522 if err != nil { |
| 501 panic(err) | 523 panic(err) |
| 502 } | 524 } |
| 503 req.Index = 666 | 525 req.Index = 666 |
| 504 } | 526 } |
| 505 | 527 |
| 506 _, err := svr.Get(c, &req) | 528 _, err := svr.Get(c, &req) |
| 507 So(err, ShouldBeRPCInternal) | 529 So(err, ShouldBeRPCInternal) |
| 508 }) | 530 }) |
| 509 | 531 |
| 510 Convey(`Will successfully retrieve both logs and
stream state.`, func() { | 532 Convey(`Will successfully retrieve both logs and
stream state.`, func() { |
| 511 req.State = true | 533 req.State = true |
| 512 | 534 |
| 513 resp, err := svr.Get(c, &req) | 535 resp, err := svr.Get(c, &req) |
| 514 So(err, ShouldBeRPCOK) | 536 So(err, ShouldBeRPCOK) |
| 515 So(resp.State, ShouldResemble, buildLogS
treamState(tls.Stream, tls.State)) | 537 So(resp.State, ShouldResemble, buildLogS
treamState(tls.Stream, tls.State)) |
| 516 So(resp, shouldHaveLogs, 0, 1, 2) | 538 So(resp, shouldHaveLogs, 0, 1, 2) |
| 517 }) | 539 }) |
| 518 | 540 |
| 519 Convey(`Will return Internal if the Storage is n
ot working.`, func() { | 541 Convey(`Will return Internal if the Storage is n
ot working.`, func() { |
| 520 if archived { | 542 if archived { |
| 521 env.GSClient["error"] = []byte("
test error") | 543 env.GSClient["error"] = []byte("
test error") |
| 522 } else { | 544 } else { |
| 523 » » » » » » is.SetErr(errors.New("not workin
g")) | 545 » » » » » » env.BigTable.SetErr(errors.New("
not working")) |
| 524 } | 546 } |
| 525 | 547 |
| 526 _, err := svr.Get(c, &req) | 548 _, err := svr.Get(c, &req) |
| 527 So(err, ShouldBeRPCInternal) | 549 So(err, ShouldBeRPCInternal) |
| 528 }) | 550 }) |
| 529 | 551 |
| 530 Convey(`Will enforce a maximum count of 2.`, fun
c() { | 552 Convey(`Will enforce a maximum count of 2.`, fun
c() { |
| 531 req.LogCount = 2 | 553 req.LogCount = 2 |
| 532 resp, err := svr.Get(c, &req) | 554 resp, err := svr.Get(c, &req) |
| 533 So(err, ShouldBeRPCOK) | 555 So(err, ShouldBeRPCOK) |
| (...skipping 79 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 613 resp, err := svr.Tail(c, &req) | 635 resp, err := svr.Tail(c, &req) |
| 614 So(err, ShouldBeRPCOK) | 636 So(err, ShouldBeRPCOK) |
| 615 So(resp, shouldHaveLogs, 7) | 637 So(resp, shouldHaveLogs, 7) |
| 616 So(resp.State, ShouldResemble, buildLogS
treamState(tls.Stream, tls.State)) | 638 So(resp.State, ShouldResemble, buildLogS
treamState(tls.Stream, tls.State)) |
| 617 | 639 |
| 618 // For non-archival: 1 miss and 1 put, f
or the tail row. | 640 // For non-archival: 1 miss and 1 put, f
or the tail row. |
| 619 // For archival: 1 miss and 1 put, for t
he index. | 641 // For archival: 1 miss and 1 put, for t
he index. |
| 620 So(env.StorageCache.Stats(), ShouldResem
ble, ct.StorageCacheStats{Puts: 1, Misses: 1}) | 642 So(env.StorageCache.Stats(), ShouldResem
ble, ct.StorageCacheStats{Puts: 1, Misses: 1}) |
| 621 | 643 |
| 622 Convey(`Will retrieve the stream path ag
ain (caching).`, func() { | 644 Convey(`Will retrieve the stream path ag
ain (caching).`, func() { |
| 623 // Re-put the log data, since th
e previous request closed the | |
| 624 // storage instance. | |
| 625 putLogData() | |
| 626 env.StorageCache.Clear() | 645 env.StorageCache.Clear() |
| 627 | 646 |
| 628 resp, err := svr.Tail(c, &req) | 647 resp, err := svr.Tail(c, &req) |
| 629 So(err, ShouldBeRPCOK) | 648 So(err, ShouldBeRPCOK) |
| 630 So(resp, shouldHaveLogs, 7) | 649 So(resp, shouldHaveLogs, 7) |
| 631 So(resp.State, ShouldResemble, b
uildLogStreamState(tls.Stream, tls.State)) | 650 So(resp.State, ShouldResemble, b
uildLogStreamState(tls.Stream, tls.State)) |
| 632 | 651 |
| 633 // For non-archival: 1 hit, for
the tail row. | 652 // For non-archival: 1 hit, for
the tail row. |
| 634 // For archival: 1 hit, for the
index. | 653 // For archival: 1 hit, for the
index. |
| 635 So(env.StorageCache.Stats(), Sho
uldResemble, ct.StorageCacheStats{Hits: 1}) | 654 So(env.StorageCache.Stats(), Sho
uldResemble, ct.StorageCacheStats{Hits: 1}) |
| 636 }) | 655 }) |
| 637 }) | 656 }) |
| 638 }) | 657 }) |
| 639 }) | 658 }) |
| 640 }) | 659 }) |
| 641 } | 660 } |
| 642 | 661 |
| 643 func TestGetIntermediate(t *testing.T) { | 662 func TestGetIntermediate(t *testing.T) { |
| 644 t.Parallel() | 663 t.Parallel() |
| 645 | 664 |
| 646 testGetImpl(t, false) | 665 testGetImpl(t, false) |
| 647 } | 666 } |
| 648 | 667 |
| 649 func TestGetArchived(t *testing.T) { | 668 func TestGetArchived(t *testing.T) { |
| 650 t.Parallel() | 669 t.Parallel() |
| 651 | 670 |
| 652 testGetImpl(t, true) | 671 testGetImpl(t, true) |
| 653 } | 672 } |
| OLD | NEW |