| OLD | NEW |
| 1 // Copyright 2015 The LUCI Authors. All rights reserved. | 1 // Copyright 2015 The LUCI Authors. All rights reserved. |
| 2 // Use of this source code is governed under the Apache License, Version 2.0 | 2 // Use of this source code is governed under the Apache License, Version 2.0 |
| 3 // that can be found in the LICENSE file. | 3 // that can be found in the LICENSE file. |
| 4 | 4 |
| 5 package logs | 5 package logs |
| 6 | 6 |
| 7 import ( | 7 import ( |
| 8 "bytes" | 8 "bytes" |
| 9 "errors" | 9 "errors" |
| 10 "fmt" | 10 "fmt" |
| 11 "math" | 11 "math" |
| 12 "testing" | 12 "testing" |
| 13 "time" | 13 "time" |
| 14 | 14 |
| 15 "github.com/luci/gae/filter/featureBreaker" | |
| 16 "github.com/luci/luci-go/common/config" | 15 "github.com/luci/luci-go/common/config" |
| 17 "github.com/luci/luci-go/common/data/recordio" | 16 "github.com/luci/luci-go/common/data/recordio" |
| 18 "github.com/luci/luci-go/common/iotools" | 17 "github.com/luci/luci-go/common/iotools" |
| 19 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" | 18 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" |
| 20 "github.com/luci/luci-go/logdog/api/logpb" | 19 "github.com/luci/luci-go/logdog/api/logpb" |
| 21 ct "github.com/luci/luci-go/logdog/appengine/coordinator/coordinatorTest
" | 20 ct "github.com/luci/luci-go/logdog/appengine/coordinator/coordinatorTest
" |
| 22 "github.com/luci/luci-go/logdog/common/archive" | 21 "github.com/luci/luci-go/logdog/common/archive" |
| 23 "github.com/luci/luci-go/logdog/common/renderer" | 22 "github.com/luci/luci-go/logdog/common/renderer" |
| 24 "github.com/luci/luci-go/logdog/common/storage" | 23 "github.com/luci/luci-go/logdog/common/storage" |
| 24 "github.com/luci/luci-go/logdog/common/storage/bigtable" |
| 25 "github.com/luci/luci-go/logdog/common/types" | 25 "github.com/luci/luci-go/logdog/common/types" |
| 26 | 26 |
| 27 "github.com/luci/gae/filter/featureBreaker" |
| 28 |
| 27 "github.com/golang/protobuf/proto" | 29 "github.com/golang/protobuf/proto" |
| 28 "golang.org/x/net/context" | 30 "golang.org/x/net/context" |
| 29 | 31 |
| 30 . "github.com/luci/luci-go/common/testing/assertions" | 32 . "github.com/luci/luci-go/common/testing/assertions" |
| 31 . "github.com/smartystreets/goconvey/convey" | 33 . "github.com/smartystreets/goconvey/convey" |
| 32 ) | 34 ) |
| 33 | 35 |
| 34 func shouldHaveLogs(actual interface{}, expected ...interface{}) string { | 36 func shouldHaveLogs(actual interface{}, expected ...interface{}) string { |
| 35 resp := actual.(*logdog.GetResponse) | 37 resp := actual.(*logdog.GetResponse) |
| 36 | 38 |
| (...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 71 trash.ReadFrom(r) | 73 trash.ReadFrom(r) |
| 72 } | 74 } |
| 73 } | 75 } |
| 74 | 76 |
| 75 func testGetImpl(t *testing.T, archived bool) { | 77 func testGetImpl(t *testing.T, archived bool) { |
| 76 Convey(fmt.Sprintf(`With a testing configuration, a Get request (archive
d=%v)`, archived), t, func() { | 78 Convey(fmt.Sprintf(`With a testing configuration, a Get request (archive
d=%v)`, archived), t, func() { |
| 77 c, env := ct.Install() | 79 c, env := ct.Install() |
| 78 | 80 |
| 79 svr := New() | 81 svr := New() |
| 80 | 82 |
| 83 // Use an actual BigTable testing instance for our intermediate
storage, |
| 84 // instead of the default in-memory instance. |
| 85 is := bigtable.NewMemoryInstance(c, bigtable.Options{ |
| 86 Cache: env.Services.StorageCache(), |
| 87 }) |
| 88 env.Services.IS = func() (storage.Storage, error) { return is, n
il } |
| 89 |
| 81 // di is a datastore bound to the test project namespace. | 90 // di is a datastore bound to the test project namespace. |
| 82 const project = config.ProjectName("proj-foo") | 91 const project = config.ProjectName("proj-foo") |
| 83 | 92 |
| 84 // Generate our test stream. | 93 // Generate our test stream. |
| 85 tls := ct.MakeStream(c, "proj-foo", "testing/+/foo/bar") | 94 tls := ct.MakeStream(c, "proj-foo", "testing/+/foo/bar") |
| 86 | 95 |
| 87 putLogStream := func(c context.Context) { | 96 putLogStream := func(c context.Context) { |
| 88 if err := tls.Put(c); err != nil { | 97 if err := tls.Put(c); err != nil { |
| 89 panic(err) | 98 panic(err) |
| 90 } | 99 } |
| (...skipping 188 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 279 }) | 288 }) |
| 280 | 289 |
| 281 Convey(`Will fail with NotFound if the log path does not
exist (different path).`, func() { | 290 Convey(`Will fail with NotFound if the log path does not
exist (different path).`, func() { |
| 282 req.Path = "testing/+/does/not/exist" | 291 req.Path = "testing/+/does/not/exist" |
| 283 _, err := svr.Tail(c, &req) | 292 _, err := svr.Tail(c, &req) |
| 284 So(err, ShouldBeRPCNotFound) | 293 So(err, ShouldBeRPCNotFound) |
| 285 }) | 294 }) |
| 286 }) | 295 }) |
| 287 | 296 |
| 288 Convey(`When testing log data is added`, func() { | 297 Convey(`When testing log data is added`, func() { |
| 289 » » » if !archived { | 298 » » » putLogData := func() { |
| 290 » » » » // Add the logs to the in-memory temporary stora
ge. | 299 » » » » if !archived { |
| 291 » » » » for _, le := range entries { | 300 » » » » » // Add the logs to the in-memory tempora
ry storage. |
| 292 » » » » » err := env.IntermediateStorage.Put(stora
ge.PutRequest{ | 301 » » » » » for _, le := range entries { |
| 293 » » » » » » Project: project, | 302 » » » » » » err := is.Put(storage.PutRequest
{ |
| 294 » » » » » » Path: tls.Path, | 303 » » » » » » » Project: project, |
| 295 » » » » » » Index: types.MessageIndex(le.S
treamIndex), | 304 » » » » » » » Path: tls.Path, |
| 296 » » » » » » Values: [][]byte{protobufs[le.S
treamIndex]}, | 305 » » » » » » » Index: types.MessageIn
dex(le.StreamIndex), |
| 297 » » » » » }) | 306 » » » » » » » Values: [][]byte{protob
ufs[le.StreamIndex]}, |
| 298 » » » » » if err != nil { | 307 » » » » » » }) |
| 299 » » » » » » panic(fmt.Errorf("failed to Put(
) LogEntry: %v", err)) | 308 » » » » » » if err != nil { |
| 309 » » » » » » » panic(fmt.Errorf("failed
to Put() LogEntry: %v", err)) |
| 310 » » » » » » } |
| 300 } | 311 } |
| 312 } else { |
| 313 // Archive this log stream. We will gene
rate one index entry for every |
| 314 // 2 log entries. |
| 315 src := renderer.StaticSource(entries) |
| 316 var lbuf, ibuf bytes.Buffer |
| 317 m := archive.Manifest{ |
| 318 Desc: tls.Desc, |
| 319 Source: &src, |
| 320 LogWriter: &lbuf, |
| 321 IndexWriter: &ibuf, |
| 322 StreamIndexRange: 2, |
| 323 } |
| 324 if err := archive.Archive(m); err != nil
{ |
| 325 panic(err) |
| 326 } |
| 327 |
| 328 now := env.Clock.Now().UTC() |
| 329 |
| 330 env.GSClient.Put("gs://testbucket/stream
", lbuf.Bytes()) |
| 331 env.GSClient.Put("gs://testbucket/index"
, ibuf.Bytes()) |
| 332 tls.State.TerminatedTime = now |
| 333 tls.State.ArchivedTime = now |
| 334 tls.State.ArchiveStreamURL = "gs://testb
ucket/stream" |
| 335 tls.State.ArchiveIndexURL = "gs://testbu
cket/index" |
| 336 |
| 337 So(tls.State.ArchivalState().Archived(),
ShouldBeTrue) |
| 301 } | 338 } |
| 302 } else { | |
| 303 // Archive this log stream. We will generate one
index entry for every | |
| 304 // 2 log entries. | |
| 305 src := renderer.StaticSource(entries) | |
| 306 var lbuf, ibuf bytes.Buffer | |
| 307 m := archive.Manifest{ | |
| 308 Desc: tls.Desc, | |
| 309 Source: &src, | |
| 310 LogWriter: &lbuf, | |
| 311 IndexWriter: &ibuf, | |
| 312 StreamIndexRange: 2, | |
| 313 } | |
| 314 if err := archive.Archive(m); err != nil { | |
| 315 panic(err) | |
| 316 } | |
| 317 | |
| 318 now := env.Clock.Now().UTC() | |
| 319 | |
| 320 env.GSClient.Put("gs://testbucket/stream", lbuf.
Bytes()) | |
| 321 env.GSClient.Put("gs://testbucket/index", ibuf.B
ytes()) | |
| 322 tls.State.TerminatedTime = now | |
| 323 tls.State.ArchivedTime = now | |
| 324 tls.State.ArchiveStreamURL = "gs://testbucket/st
ream" | |
| 325 tls.State.ArchiveIndexURL = "gs://testbucket/ind
ex" | |
| 326 | |
| 327 So(tls.State.ArchivalState().Archived(), ShouldB
eTrue) | |
| 328 } | 339 } |
| 340 putLogData() |
| 329 putLogStream(c) | 341 putLogStream(c) |
| 330 | 342 |
| 331 Convey(`Testing Get requests`, func() { | 343 Convey(`Testing Get requests`, func() { |
| 332 req := logdog.GetRequest{ | 344 req := logdog.GetRequest{ |
| 333 Project: string(project), | 345 Project: string(project), |
| 334 Path: string(tls.Path), | 346 Path: string(tls.Path), |
| 335 } | 347 } |
| 336 | 348 |
| 337 Convey(`When the log stream is purged`, func() { | 349 Convey(`When the log stream is purged`, func() { |
| 338 tls.Stream.Purged = true | 350 tls.Stream.Purged = true |
| (...skipping 19 matching lines...) Expand all Loading... |
| 358 | 370 |
| 359 resp, err := svr.Get(c, &req) | 371 resp, err := svr.Get(c, &req) |
| 360 So(err, ShouldBeRPCOK) | 372 So(err, ShouldBeRPCOK) |
| 361 So(resp.Logs, ShouldHaveLength, 0) | 373 So(resp.Logs, ShouldHaveLength, 0) |
| 362 }) | 374 }) |
| 363 | 375 |
| 364 Convey(`Will successfully retrieve a stream path
.`, func() { | 376 Convey(`Will successfully retrieve a stream path
.`, func() { |
| 365 resp, err := svr.Get(c, &req) | 377 resp, err := svr.Get(c, &req) |
| 366 So(err, ShouldBeRPCOK) | 378 So(err, ShouldBeRPCOK) |
| 367 So(resp, shouldHaveLogs, 0, 1, 2) | 379 So(resp, shouldHaveLogs, 0, 1, 2) |
| 380 |
| 381 Convey(`Will successfully retrieve the s
tream path again (caching).`, func() { |
| 382 // Re-put the log data, since th
e previous request closed the |
| 383 // storage instance. |
| 384 putLogData() |
| 385 |
| 386 resp, err := svr.Get(c, &req) |
| 387 So(err, ShouldBeRPCOK) |
| 388 So(resp, shouldHaveLogs, 0, 1, 2
) |
| 389 }) |
| 368 }) | 390 }) |
| 369 | 391 |
| 370 Convey(`Will successfully retrieve a stream path
offset at 4.`, func() { | 392 Convey(`Will successfully retrieve a stream path
offset at 4.`, func() { |
| 371 req.Index = 4 | 393 req.Index = 4 |
| 372 | 394 |
| 373 resp, err := svr.Get(c, &req) | 395 resp, err := svr.Get(c, &req) |
| 374 So(err, ShouldBeRPCOK) | 396 So(err, ShouldBeRPCOK) |
| 375 So(resp, shouldHaveLogs, 4, 5) | 397 So(resp, shouldHaveLogs, 4, 5) |
| 376 }) | 398 }) |
| 377 | 399 |
| (...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 462 }) | 484 }) |
| 463 | 485 |
| 464 Convey(`Will return Internal if the protobuf log
entry data is corrupt.`, func() { | 486 Convey(`Will return Internal if the protobuf log
entry data is corrupt.`, func() { |
| 465 if archived { | 487 if archived { |
| 466 // Corrupt the archive datastrea
m. | 488 // Corrupt the archive datastrea
m. |
| 467 stream := env.GSClient.Get("gs:/
/testbucket/stream") | 489 stream := env.GSClient.Get("gs:/
/testbucket/stream") |
| 468 zeroRecords(stream) | 490 zeroRecords(stream) |
| 469 } else { | 491 } else { |
| 470 // Add corrupted entry to Storag
e. Create a new entry here, since | 492 // Add corrupted entry to Storag
e. Create a new entry here, since |
| 471 // the storage will reject a dup
licate/overwrite. | 493 // the storage will reject a dup
licate/overwrite. |
| 472 » » » » » » err := env.IntermediateStorage.P
ut(storage.PutRequest{ | 494 » » » » » » err := is.Put(storage.PutRequest
{ |
| 473 Project: project, | 495 Project: project, |
| 474 Path: types.StreamPat
h(req.Path), | 496 Path: types.StreamPat
h(req.Path), |
| 475 Index: 666, | 497 Index: 666, |
| 476 Values: [][]byte{{0x00}
}, // Invalid protobuf, zero tag. | 498 Values: [][]byte{{0x00}
}, // Invalid protobuf, zero tag. |
| 477 }) | 499 }) |
| 478 if err != nil { | 500 if err != nil { |
| 479 panic(err) | 501 panic(err) |
| 480 } | 502 } |
| 481 req.Index = 666 | 503 req.Index = 666 |
| 482 } | 504 } |
| 483 | 505 |
| 484 _, err := svr.Get(c, &req) | 506 _, err := svr.Get(c, &req) |
| 485 So(err, ShouldBeRPCInternal) | 507 So(err, ShouldBeRPCInternal) |
| 486 }) | 508 }) |
| 487 | 509 |
| 488 Convey(`Will successfully retrieve both logs and
stream state.`, func() { | 510 Convey(`Will successfully retrieve both logs and
stream state.`, func() { |
| 489 req.State = true | 511 req.State = true |
| 490 | 512 |
| 491 resp, err := svr.Get(c, &req) | 513 resp, err := svr.Get(c, &req) |
| 492 So(err, ShouldBeRPCOK) | 514 So(err, ShouldBeRPCOK) |
| 493 So(resp.State, ShouldResemble, buildLogS
treamState(tls.Stream, tls.State)) | 515 So(resp.State, ShouldResemble, buildLogS
treamState(tls.Stream, tls.State)) |
| 494 So(resp, shouldHaveLogs, 0, 1, 2) | 516 So(resp, shouldHaveLogs, 0, 1, 2) |
| 495 }) | 517 }) |
| 496 | 518 |
| 497 Convey(`Will return Internal if the Storage is n
ot working.`, func() { | 519 Convey(`Will return Internal if the Storage is n
ot working.`, func() { |
| 498 if archived { | 520 if archived { |
| 499 env.GSClient["error"] = []byte("
test error") | 521 env.GSClient["error"] = []byte("
test error") |
| 500 } else { | 522 } else { |
| 501 » » » » » » env.IntermediateStorage.Close() | 523 » » » » » » is.SetErr(errors.New("not workin
g")) |
| 502 } | 524 } |
| 503 | 525 |
| 504 _, err := svr.Get(c, &req) | 526 _, err := svr.Get(c, &req) |
| 505 So(err, ShouldBeRPCInternal) | 527 So(err, ShouldBeRPCInternal) |
| 506 }) | 528 }) |
| 507 | 529 |
| 508 Convey(`Will enforce a maximum count of 2.`, fun
c() { | 530 Convey(`Will enforce a maximum count of 2.`, fun
c() { |
| 509 req.LogCount = 2 | 531 req.LogCount = 2 |
| 510 resp, err := svr.Get(c, &req) | 532 resp, err := svr.Get(c, &req) |
| 511 So(err, ShouldBeRPCOK) | 533 So(err, ShouldBeRPCOK) |
| (...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 585 Project: string(project), | 607 Project: string(project), |
| 586 Path: string(tls.Path), | 608 Path: string(tls.Path), |
| 587 State: true, | 609 State: true, |
| 588 } | 610 } |
| 589 | 611 |
| 590 Convey(`Will successfully retrieve a stream path
.`, func() { | 612 Convey(`Will successfully retrieve a stream path
.`, func() { |
| 591 resp, err := svr.Tail(c, &req) | 613 resp, err := svr.Tail(c, &req) |
| 592 So(err, ShouldBeRPCOK) | 614 So(err, ShouldBeRPCOK) |
| 593 So(resp, shouldHaveLogs, 7) | 615 So(resp, shouldHaveLogs, 7) |
| 594 So(resp.State, ShouldResemble, buildLogS
treamState(tls.Stream, tls.State)) | 616 So(resp.State, ShouldResemble, buildLogS
treamState(tls.Stream, tls.State)) |
| 617 |
| 618 // For non-archival: 1 miss and 1 put, f
or the tail row. |
| 619 // For archival: 1 miss and 1 put, for t
he index. |
| 620 So(env.StorageCache.Stats(), ShouldResem
ble, ct.StorageCacheStats{Puts: 1, Misses: 1}) |
| 621 |
| 622 Convey(`Will retrieve the stream path ag
ain (caching).`, func() { |
| 623 // Re-put the log data, since th
e previous request closed the |
| 624 // storage instance. |
| 625 putLogData() |
| 626 env.StorageCache.Clear() |
| 627 |
| 628 resp, err := svr.Tail(c, &req) |
| 629 So(err, ShouldBeRPCOK) |
| 630 So(resp, shouldHaveLogs, 7) |
| 631 So(resp.State, ShouldResemble, b
uildLogStreamState(tls.Stream, tls.State)) |
| 632 |
| 633 // For non-archival: 1 hit, for
the tail row. |
| 634 // For archival: 1 hit, for the
index. |
| 635 So(env.StorageCache.Stats(), Sho
uldResemble, ct.StorageCacheStats{Hits: 1}) |
| 636 }) |
| 595 }) | 637 }) |
| 596 }) | 638 }) |
| 597 }) | 639 }) |
| 598 }) | 640 }) |
| 599 } | 641 } |
| 600 | 642 |
| 601 func TestGetIntermediate(t *testing.T) { | 643 func TestGetIntermediate(t *testing.T) { |
| 602 t.Parallel() | 644 t.Parallel() |
| 603 | 645 |
| 604 testGetImpl(t, false) | 646 testGetImpl(t, false) |
| 605 } | 647 } |
| 606 | 648 |
| 607 func TestGetArchived(t *testing.T) { | 649 func TestGetArchived(t *testing.T) { |
| 608 t.Parallel() | 650 t.Parallel() |
| 609 | 651 |
| 610 » testGetImpl(t, false) | 652 » testGetImpl(t, true) |
| 611 } | 653 } |
| OLD | NEW |