Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(59)

Unified Diff: logdog/appengine/coordinator/endpoints/logs/get_test.go

Issue 2435113002: LogDog: Add Storage-layer data caching. (Closed)
Patch Set: Fix byteLimit bug. Created 4 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « logdog/appengine/coordinator/endpoints/logs/get.go ('k') | logdog/appengine/coordinator/service.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: logdog/appengine/coordinator/endpoints/logs/get_test.go
diff --git a/logdog/appengine/coordinator/endpoints/logs/get_test.go b/logdog/appengine/coordinator/endpoints/logs/get_test.go
index db4f11b5a78d71e7ce82408ae9d06cf4dc061a6e..eed79dc3121c907d791077ba9def148f63f31405 100644
--- a/logdog/appengine/coordinator/endpoints/logs/get_test.go
+++ b/logdog/appengine/coordinator/endpoints/logs/get_test.go
@@ -12,7 +12,6 @@ import (
"testing"
"time"
- "github.com/luci/gae/filter/featureBreaker"
"github.com/luci/luci-go/common/config"
"github.com/luci/luci-go/common/data/recordio"
"github.com/luci/luci-go/common/iotools"
@@ -22,8 +21,11 @@ import (
"github.com/luci/luci-go/logdog/common/archive"
"github.com/luci/luci-go/logdog/common/renderer"
"github.com/luci/luci-go/logdog/common/storage"
+ "github.com/luci/luci-go/logdog/common/storage/bigtable"
"github.com/luci/luci-go/logdog/common/types"
+ "github.com/luci/gae/filter/featureBreaker"
+
"github.com/golang/protobuf/proto"
"golang.org/x/net/context"
@@ -78,6 +80,13 @@ func testGetImpl(t *testing.T, archived bool) {
svr := New()
+ // Use an actual BigTable testing instance for our intermediate storage,
+ // instead of the default in-memory instance.
+ is := bigtable.NewMemoryInstance(c, bigtable.Options{
+ Cache: env.Services.StorageCache(),
+ })
+ env.Services.IS = func() (storage.Storage, error) { return is, nil }
+
// di is a datastore bound to the test project namespace.
const project = config.ProjectName("proj-foo")
@@ -286,46 +295,49 @@ func testGetImpl(t *testing.T, archived bool) {
})
Convey(`When testing log data is added`, func() {
- if !archived {
- // Add the logs to the in-memory temporary storage.
- for _, le := range entries {
- err := env.IntermediateStorage.Put(storage.PutRequest{
- Project: project,
- Path: tls.Path,
- Index: types.MessageIndex(le.StreamIndex),
- Values: [][]byte{protobufs[le.StreamIndex]},
- })
- if err != nil {
- panic(fmt.Errorf("failed to Put() LogEntry: %v", err))
+ putLogData := func() {
+ if !archived {
+ // Add the logs to the in-memory temporary storage.
+ for _, le := range entries {
+ err := is.Put(storage.PutRequest{
+ Project: project,
+ Path: tls.Path,
+ Index: types.MessageIndex(le.StreamIndex),
+ Values: [][]byte{protobufs[le.StreamIndex]},
+ })
+ if err != nil {
+ panic(fmt.Errorf("failed to Put() LogEntry: %v", err))
+ }
+ }
+ } else {
+ // Archive this log stream. We will generate one index entry for every
+ // 2 log entries.
+ src := renderer.StaticSource(entries)
+ var lbuf, ibuf bytes.Buffer
+ m := archive.Manifest{
+ Desc: tls.Desc,
+ Source: &src,
+ LogWriter: &lbuf,
+ IndexWriter: &ibuf,
+ StreamIndexRange: 2,
+ }
+ if err := archive.Archive(m); err != nil {
+ panic(err)
}
- }
- } else {
- // Archive this log stream. We will generate one index entry for every
- // 2 log entries.
- src := renderer.StaticSource(entries)
- var lbuf, ibuf bytes.Buffer
- m := archive.Manifest{
- Desc: tls.Desc,
- Source: &src,
- LogWriter: &lbuf,
- IndexWriter: &ibuf,
- StreamIndexRange: 2,
- }
- if err := archive.Archive(m); err != nil {
- panic(err)
- }
- now := env.Clock.Now().UTC()
+ now := env.Clock.Now().UTC()
- env.GSClient.Put("gs://testbucket/stream", lbuf.Bytes())
- env.GSClient.Put("gs://testbucket/index", ibuf.Bytes())
- tls.State.TerminatedTime = now
- tls.State.ArchivedTime = now
- tls.State.ArchiveStreamURL = "gs://testbucket/stream"
- tls.State.ArchiveIndexURL = "gs://testbucket/index"
+ env.GSClient.Put("gs://testbucket/stream", lbuf.Bytes())
+ env.GSClient.Put("gs://testbucket/index", ibuf.Bytes())
+ tls.State.TerminatedTime = now
+ tls.State.ArchivedTime = now
+ tls.State.ArchiveStreamURL = "gs://testbucket/stream"
+ tls.State.ArchiveIndexURL = "gs://testbucket/index"
- So(tls.State.ArchivalState().Archived(), ShouldBeTrue)
+ So(tls.State.ArchivalState().Archived(), ShouldBeTrue)
+ }
}
+ putLogData()
putLogStream(c)
Convey(`Testing Get requests`, func() {
@@ -365,6 +377,16 @@ func testGetImpl(t *testing.T, archived bool) {
resp, err := svr.Get(c, &req)
So(err, ShouldBeRPCOK)
So(resp, shouldHaveLogs, 0, 1, 2)
+
+ Convey(`Will successfully retrieve the stream path again (caching).`, func() {
+ // Re-put the log data, since the previous request closed the
+ // storage instance.
+ putLogData()
+
+ resp, err := svr.Get(c, &req)
+ So(err, ShouldBeRPCOK)
+ So(resp, shouldHaveLogs, 0, 1, 2)
+ })
})
Convey(`Will successfully retrieve a stream path offset at 4.`, func() {
@@ -469,7 +491,7 @@ func testGetImpl(t *testing.T, archived bool) {
} else {
// Add corrupted entry to Storage. Create a new entry here, since
// the storage will reject a duplicate/overwrite.
- err := env.IntermediateStorage.Put(storage.PutRequest{
+ err := is.Put(storage.PutRequest{
Project: project,
Path: types.StreamPath(req.Path),
Index: 666,
@@ -498,7 +520,7 @@ func testGetImpl(t *testing.T, archived bool) {
if archived {
env.GSClient["error"] = []byte("test error")
} else {
- env.IntermediateStorage.Close()
+ is.SetErr(errors.New("not working"))
}
_, err := svr.Get(c, &req)
@@ -592,6 +614,26 @@ func testGetImpl(t *testing.T, archived bool) {
So(err, ShouldBeRPCOK)
So(resp, shouldHaveLogs, 7)
So(resp.State, ShouldResemble, buildLogStreamState(tls.Stream, tls.State))
+
+ // For non-archival: 1 miss and 1 put, for the tail row.
+ // For archival: 1 miss and 1 put, for the index.
+ So(env.StorageCache.Stats(), ShouldResemble, ct.StorageCacheStats{Puts: 1, Misses: 1})
+
+ Convey(`Will retrieve the stream path again (caching).`, func() {
+ // Re-put the log data, since the previous request closed the
+ // storage instance.
+ putLogData()
+ env.StorageCache.Clear()
+
+ resp, err := svr.Tail(c, &req)
+ So(err, ShouldBeRPCOK)
+ So(resp, shouldHaveLogs, 7)
+ So(resp.State, ShouldResemble, buildLogStreamState(tls.Stream, tls.State))
+
+ // For non-archival: 1 hit, for the tail row.
+ // For archival: 1 hit, for the index.
+ So(env.StorageCache.Stats(), ShouldResemble, ct.StorageCacheStats{Hits: 1})
+ })
})
})
})
@@ -607,5 +649,5 @@ func TestGetIntermediate(t *testing.T) {
func TestGetArchived(t *testing.T) {
t.Parallel()
- testGetImpl(t, false)
+ testGetImpl(t, true)
}
« no previous file with comments | « logdog/appengine/coordinator/endpoints/logs/get.go ('k') | logdog/appengine/coordinator/service.go » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698