Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(485)

Side by Side Diff: logdog/appengine/coordinator/endpoints/logs/get_test.go

Issue 2435113002: LogDog: Add Storage-layer data caching. (Closed)
Patch Set: Fix byteLimit bug. Created 4 years, 1 month ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2015 The LUCI Authors. All rights reserved. 1 // Copyright 2015 The LUCI Authors. All rights reserved.
2 // Use of this source code is governed under the Apache License, Version 2.0 2 // Use of this source code is governed under the Apache License, Version 2.0
3 // that can be found in the LICENSE file. 3 // that can be found in the LICENSE file.
4 4
5 package logs 5 package logs
6 6
7 import ( 7 import (
8 "bytes" 8 "bytes"
9 "errors" 9 "errors"
10 "fmt" 10 "fmt"
11 "math" 11 "math"
12 "testing" 12 "testing"
13 "time" 13 "time"
14 14
15 "github.com/luci/gae/filter/featureBreaker"
16 "github.com/luci/luci-go/common/config" 15 "github.com/luci/luci-go/common/config"
17 "github.com/luci/luci-go/common/data/recordio" 16 "github.com/luci/luci-go/common/data/recordio"
18 "github.com/luci/luci-go/common/iotools" 17 "github.com/luci/luci-go/common/iotools"
19 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1" 18 "github.com/luci/luci-go/logdog/api/endpoints/coordinator/logs/v1"
20 "github.com/luci/luci-go/logdog/api/logpb" 19 "github.com/luci/luci-go/logdog/api/logpb"
21 ct "github.com/luci/luci-go/logdog/appengine/coordinator/coordinatorTest " 20 ct "github.com/luci/luci-go/logdog/appengine/coordinator/coordinatorTest "
22 "github.com/luci/luci-go/logdog/common/archive" 21 "github.com/luci/luci-go/logdog/common/archive"
23 "github.com/luci/luci-go/logdog/common/renderer" 22 "github.com/luci/luci-go/logdog/common/renderer"
24 "github.com/luci/luci-go/logdog/common/storage" 23 "github.com/luci/luci-go/logdog/common/storage"
24 "github.com/luci/luci-go/logdog/common/storage/bigtable"
25 "github.com/luci/luci-go/logdog/common/types" 25 "github.com/luci/luci-go/logdog/common/types"
26 26
27 "github.com/luci/gae/filter/featureBreaker"
28
27 "github.com/golang/protobuf/proto" 29 "github.com/golang/protobuf/proto"
28 "golang.org/x/net/context" 30 "golang.org/x/net/context"
29 31
30 . "github.com/luci/luci-go/common/testing/assertions" 32 . "github.com/luci/luci-go/common/testing/assertions"
31 . "github.com/smartystreets/goconvey/convey" 33 . "github.com/smartystreets/goconvey/convey"
32 ) 34 )
33 35
34 func shouldHaveLogs(actual interface{}, expected ...interface{}) string { 36 func shouldHaveLogs(actual interface{}, expected ...interface{}) string {
35 resp := actual.(*logdog.GetResponse) 37 resp := actual.(*logdog.GetResponse)
36 38
(...skipping 34 matching lines...) Expand 10 before | Expand all | Expand 10 after
71 trash.ReadFrom(r) 73 trash.ReadFrom(r)
72 } 74 }
73 } 75 }
74 76
75 func testGetImpl(t *testing.T, archived bool) { 77 func testGetImpl(t *testing.T, archived bool) {
76 Convey(fmt.Sprintf(`With a testing configuration, a Get request (archive d=%v)`, archived), t, func() { 78 Convey(fmt.Sprintf(`With a testing configuration, a Get request (archive d=%v)`, archived), t, func() {
77 c, env := ct.Install() 79 c, env := ct.Install()
78 80
79 svr := New() 81 svr := New()
80 82
83 // Use an actual BigTable testing instance for our intermediate storage,
84 // instead of the default in-memory instance.
85 is := bigtable.NewMemoryInstance(c, bigtable.Options{
86 Cache: env.Services.StorageCache(),
87 })
88 env.Services.IS = func() (storage.Storage, error) { return is, n il }
89
81 // di is a datastore bound to the test project namespace. 90 // di is a datastore bound to the test project namespace.
82 const project = config.ProjectName("proj-foo") 91 const project = config.ProjectName("proj-foo")
83 92
84 // Generate our test stream. 93 // Generate our test stream.
85 tls := ct.MakeStream(c, "proj-foo", "testing/+/foo/bar") 94 tls := ct.MakeStream(c, "proj-foo", "testing/+/foo/bar")
86 95
87 putLogStream := func(c context.Context) { 96 putLogStream := func(c context.Context) {
88 if err := tls.Put(c); err != nil { 97 if err := tls.Put(c); err != nil {
89 panic(err) 98 panic(err)
90 } 99 }
(...skipping 188 matching lines...) Expand 10 before | Expand all | Expand 10 after
279 }) 288 })
280 289
281 Convey(`Will fail with NotFound if the log path does not exist (different path).`, func() { 290 Convey(`Will fail with NotFound if the log path does not exist (different path).`, func() {
282 req.Path = "testing/+/does/not/exist" 291 req.Path = "testing/+/does/not/exist"
283 _, err := svr.Tail(c, &req) 292 _, err := svr.Tail(c, &req)
284 So(err, ShouldBeRPCNotFound) 293 So(err, ShouldBeRPCNotFound)
285 }) 294 })
286 }) 295 })
287 296
288 Convey(`When testing log data is added`, func() { 297 Convey(`When testing log data is added`, func() {
289 » » » if !archived { 298 » » » putLogData := func() {
290 » » » » // Add the logs to the in-memory temporary stora ge. 299 » » » » if !archived {
291 » » » » for _, le := range entries { 300 » » » » » // Add the logs to the in-memory tempora ry storage.
292 » » » » » err := env.IntermediateStorage.Put(stora ge.PutRequest{ 301 » » » » » for _, le := range entries {
293 » » » » » » Project: project, 302 » » » » » » err := is.Put(storage.PutRequest {
294 » » » » » » Path: tls.Path, 303 » » » » » » » Project: project,
295 » » » » » » Index: types.MessageIndex(le.S treamIndex), 304 » » » » » » » Path: tls.Path,
296 » » » » » » Values: [][]byte{protobufs[le.S treamIndex]}, 305 » » » » » » » Index: types.MessageIn dex(le.StreamIndex),
297 » » » » » }) 306 » » » » » » » Values: [][]byte{protob ufs[le.StreamIndex]},
298 » » » » » if err != nil { 307 » » » » » » })
299 » » » » » » panic(fmt.Errorf("failed to Put( ) LogEntry: %v", err)) 308 » » » » » » if err != nil {
309 » » » » » » » panic(fmt.Errorf("failed to Put() LogEntry: %v", err))
310 » » » » » » }
300 } 311 }
312 } else {
313 // Archive this log stream. We will gene rate one index entry for every
314 // 2 log entries.
315 src := renderer.StaticSource(entries)
316 var lbuf, ibuf bytes.Buffer
317 m := archive.Manifest{
318 Desc: tls.Desc,
319 Source: &src,
320 LogWriter: &lbuf,
321 IndexWriter: &ibuf,
322 StreamIndexRange: 2,
323 }
324 if err := archive.Archive(m); err != nil {
325 panic(err)
326 }
327
328 now := env.Clock.Now().UTC()
329
330 env.GSClient.Put("gs://testbucket/stream ", lbuf.Bytes())
331 env.GSClient.Put("gs://testbucket/index" , ibuf.Bytes())
332 tls.State.TerminatedTime = now
333 tls.State.ArchivedTime = now
334 tls.State.ArchiveStreamURL = "gs://testb ucket/stream"
335 tls.State.ArchiveIndexURL = "gs://testbu cket/index"
336
337 So(tls.State.ArchivalState().Archived(), ShouldBeTrue)
301 } 338 }
302 } else {
303 // Archive this log stream. We will generate one index entry for every
304 // 2 log entries.
305 src := renderer.StaticSource(entries)
306 var lbuf, ibuf bytes.Buffer
307 m := archive.Manifest{
308 Desc: tls.Desc,
309 Source: &src,
310 LogWriter: &lbuf,
311 IndexWriter: &ibuf,
312 StreamIndexRange: 2,
313 }
314 if err := archive.Archive(m); err != nil {
315 panic(err)
316 }
317
318 now := env.Clock.Now().UTC()
319
320 env.GSClient.Put("gs://testbucket/stream", lbuf. Bytes())
321 env.GSClient.Put("gs://testbucket/index", ibuf.B ytes())
322 tls.State.TerminatedTime = now
323 tls.State.ArchivedTime = now
324 tls.State.ArchiveStreamURL = "gs://testbucket/st ream"
325 tls.State.ArchiveIndexURL = "gs://testbucket/ind ex"
326
327 So(tls.State.ArchivalState().Archived(), ShouldB eTrue)
328 } 339 }
340 putLogData()
329 putLogStream(c) 341 putLogStream(c)
330 342
331 Convey(`Testing Get requests`, func() { 343 Convey(`Testing Get requests`, func() {
332 req := logdog.GetRequest{ 344 req := logdog.GetRequest{
333 Project: string(project), 345 Project: string(project),
334 Path: string(tls.Path), 346 Path: string(tls.Path),
335 } 347 }
336 348
337 Convey(`When the log stream is purged`, func() { 349 Convey(`When the log stream is purged`, func() {
338 tls.Stream.Purged = true 350 tls.Stream.Purged = true
(...skipping 19 matching lines...) Expand all
358 370
359 resp, err := svr.Get(c, &req) 371 resp, err := svr.Get(c, &req)
360 So(err, ShouldBeRPCOK) 372 So(err, ShouldBeRPCOK)
361 So(resp.Logs, ShouldHaveLength, 0) 373 So(resp.Logs, ShouldHaveLength, 0)
362 }) 374 })
363 375
364 Convey(`Will successfully retrieve a stream path .`, func() { 376 Convey(`Will successfully retrieve a stream path .`, func() {
365 resp, err := svr.Get(c, &req) 377 resp, err := svr.Get(c, &req)
366 So(err, ShouldBeRPCOK) 378 So(err, ShouldBeRPCOK)
367 So(resp, shouldHaveLogs, 0, 1, 2) 379 So(resp, shouldHaveLogs, 0, 1, 2)
380
381 Convey(`Will successfully retrieve the s tream path again (caching).`, func() {
382 // Re-put the log data, since th e previous request closed the
383 // storage instance.
384 putLogData()
385
386 resp, err := svr.Get(c, &req)
387 So(err, ShouldBeRPCOK)
388 So(resp, shouldHaveLogs, 0, 1, 2 )
389 })
368 }) 390 })
369 391
370 Convey(`Will successfully retrieve a stream path offset at 4.`, func() { 392 Convey(`Will successfully retrieve a stream path offset at 4.`, func() {
371 req.Index = 4 393 req.Index = 4
372 394
373 resp, err := svr.Get(c, &req) 395 resp, err := svr.Get(c, &req)
374 So(err, ShouldBeRPCOK) 396 So(err, ShouldBeRPCOK)
375 So(resp, shouldHaveLogs, 4, 5) 397 So(resp, shouldHaveLogs, 4, 5)
376 }) 398 })
377 399
(...skipping 84 matching lines...) Expand 10 before | Expand all | Expand 10 after
462 }) 484 })
463 485
464 Convey(`Will return Internal if the protobuf log entry data is corrupt.`, func() { 486 Convey(`Will return Internal if the protobuf log entry data is corrupt.`, func() {
465 if archived { 487 if archived {
466 // Corrupt the archive datastrea m. 488 // Corrupt the archive datastrea m.
467 stream := env.GSClient.Get("gs:/ /testbucket/stream") 489 stream := env.GSClient.Get("gs:/ /testbucket/stream")
468 zeroRecords(stream) 490 zeroRecords(stream)
469 } else { 491 } else {
470 // Add corrupted entry to Storag e. Create a new entry here, since 492 // Add corrupted entry to Storag e. Create a new entry here, since
471 // the storage will reject a dup licate/overwrite. 493 // the storage will reject a dup licate/overwrite.
472 » » » » » » err := env.IntermediateStorage.P ut(storage.PutRequest{ 494 » » » » » » err := is.Put(storage.PutRequest {
473 Project: project, 495 Project: project,
474 Path: types.StreamPat h(req.Path), 496 Path: types.StreamPat h(req.Path),
475 Index: 666, 497 Index: 666,
476 Values: [][]byte{{0x00} }, // Invalid protobuf, zero tag. 498 Values: [][]byte{{0x00} }, // Invalid protobuf, zero tag.
477 }) 499 })
478 if err != nil { 500 if err != nil {
479 panic(err) 501 panic(err)
480 } 502 }
481 req.Index = 666 503 req.Index = 666
482 } 504 }
483 505
484 _, err := svr.Get(c, &req) 506 _, err := svr.Get(c, &req)
485 So(err, ShouldBeRPCInternal) 507 So(err, ShouldBeRPCInternal)
486 }) 508 })
487 509
488 Convey(`Will successfully retrieve both logs and stream state.`, func() { 510 Convey(`Will successfully retrieve both logs and stream state.`, func() {
489 req.State = true 511 req.State = true
490 512
491 resp, err := svr.Get(c, &req) 513 resp, err := svr.Get(c, &req)
492 So(err, ShouldBeRPCOK) 514 So(err, ShouldBeRPCOK)
493 So(resp.State, ShouldResemble, buildLogS treamState(tls.Stream, tls.State)) 515 So(resp.State, ShouldResemble, buildLogS treamState(tls.Stream, tls.State))
494 So(resp, shouldHaveLogs, 0, 1, 2) 516 So(resp, shouldHaveLogs, 0, 1, 2)
495 }) 517 })
496 518
497 Convey(`Will return Internal if the Storage is n ot working.`, func() { 519 Convey(`Will return Internal if the Storage is n ot working.`, func() {
498 if archived { 520 if archived {
499 env.GSClient["error"] = []byte(" test error") 521 env.GSClient["error"] = []byte(" test error")
500 } else { 522 } else {
501 » » » » » » env.IntermediateStorage.Close() 523 » » » » » » is.SetErr(errors.New("not workin g"))
502 } 524 }
503 525
504 _, err := svr.Get(c, &req) 526 _, err := svr.Get(c, &req)
505 So(err, ShouldBeRPCInternal) 527 So(err, ShouldBeRPCInternal)
506 }) 528 })
507 529
508 Convey(`Will enforce a maximum count of 2.`, fun c() { 530 Convey(`Will enforce a maximum count of 2.`, fun c() {
509 req.LogCount = 2 531 req.LogCount = 2
510 resp, err := svr.Get(c, &req) 532 resp, err := svr.Get(c, &req)
511 So(err, ShouldBeRPCOK) 533 So(err, ShouldBeRPCOK)
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after
585 Project: string(project), 607 Project: string(project),
586 Path: string(tls.Path), 608 Path: string(tls.Path),
587 State: true, 609 State: true,
588 } 610 }
589 611
590 Convey(`Will successfully retrieve a stream path .`, func() { 612 Convey(`Will successfully retrieve a stream path .`, func() {
591 resp, err := svr.Tail(c, &req) 613 resp, err := svr.Tail(c, &req)
592 So(err, ShouldBeRPCOK) 614 So(err, ShouldBeRPCOK)
593 So(resp, shouldHaveLogs, 7) 615 So(resp, shouldHaveLogs, 7)
594 So(resp.State, ShouldResemble, buildLogS treamState(tls.Stream, tls.State)) 616 So(resp.State, ShouldResemble, buildLogS treamState(tls.Stream, tls.State))
617
618 // For non-archival: 1 miss and 1 put, f or the tail row.
619 // For archival: 1 miss and 1 put, for t he index.
620 So(env.StorageCache.Stats(), ShouldResem ble, ct.StorageCacheStats{Puts: 1, Misses: 1})
621
622 Convey(`Will retrieve the stream path ag ain (caching).`, func() {
623 // Re-put the log data, since th e previous request closed the
624 // storage instance.
625 putLogData()
626 env.StorageCache.Clear()
627
628 resp, err := svr.Tail(c, &req)
629 So(err, ShouldBeRPCOK)
630 So(resp, shouldHaveLogs, 7)
631 So(resp.State, ShouldResemble, b uildLogStreamState(tls.Stream, tls.State))
632
633 // For non-archival: 1 hit, for the tail row.
634 // For archival: 1 hit, for the index.
635 So(env.StorageCache.Stats(), Sho uldResemble, ct.StorageCacheStats{Hits: 1})
636 })
595 }) 637 })
596 }) 638 })
597 }) 639 })
598 }) 640 })
599 } 641 }
600 642
601 func TestGetIntermediate(t *testing.T) { 643 func TestGetIntermediate(t *testing.T) {
602 t.Parallel() 644 t.Parallel()
603 645
604 testGetImpl(t, false) 646 testGetImpl(t, false)
605 } 647 }
606 648
607 func TestGetArchived(t *testing.T) { 649 func TestGetArchived(t *testing.T) {
608 t.Parallel() 650 t.Parallel()
609 651
610 » testGetImpl(t, false) 652 » testGetImpl(t, true)
611 } 653 }
OLDNEW
« no previous file with comments | « logdog/appengine/coordinator/endpoints/logs/get.go ('k') | logdog/appengine/coordinator/service.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698