| Index: logdog/appengine/coordinator/endpoints/logs/get_test.go
|
| diff --git a/logdog/appengine/coordinator/endpoints/logs/get_test.go b/logdog/appengine/coordinator/endpoints/logs/get_test.go
|
| index db4f11b5a78d71e7ce82408ae9d06cf4dc061a6e..eed79dc3121c907d791077ba9def148f63f31405 100644
|
| --- a/logdog/appengine/coordinator/endpoints/logs/get_test.go
|
| +++ b/logdog/appengine/coordinator/endpoints/logs/get_test.go
|
| @@ -12,7 +12,6 @@ import (
|
| "testing"
|
| "time"
|
|
|
| - "github.com/luci/gae/filter/featureBreaker"
|
| "github.com/luci/luci-go/common/config"
|
| "github.com/luci/luci-go/common/data/recordio"
|
| "github.com/luci/luci-go/common/iotools"
|
| @@ -22,8 +21,11 @@ import (
|
| "github.com/luci/luci-go/logdog/common/archive"
|
| "github.com/luci/luci-go/logdog/common/renderer"
|
| "github.com/luci/luci-go/logdog/common/storage"
|
| + "github.com/luci/luci-go/logdog/common/storage/bigtable"
|
| "github.com/luci/luci-go/logdog/common/types"
|
|
|
| + "github.com/luci/gae/filter/featureBreaker"
|
| +
|
| "github.com/golang/protobuf/proto"
|
| "golang.org/x/net/context"
|
|
|
| @@ -78,6 +80,13 @@ func testGetImpl(t *testing.T, archived bool) {
|
|
|
| svr := New()
|
|
|
| + // Use an actual BigTable testing instance for our intermediate storage,
|
| + // instead of the default in-memory instance.
|
| + is := bigtable.NewMemoryInstance(c, bigtable.Options{
|
| + Cache: env.Services.StorageCache(),
|
| + })
|
| + env.Services.IS = func() (storage.Storage, error) { return is, nil }
|
| +
|
| // di is a datastore bound to the test project namespace.
|
| const project = config.ProjectName("proj-foo")
|
|
|
| @@ -286,46 +295,49 @@ func testGetImpl(t *testing.T, archived bool) {
|
| })
|
|
|
| Convey(`When testing log data is added`, func() {
|
| - if !archived {
|
| - // Add the logs to the in-memory temporary storage.
|
| - for _, le := range entries {
|
| - err := env.IntermediateStorage.Put(storage.PutRequest{
|
| - Project: project,
|
| - Path: tls.Path,
|
| - Index: types.MessageIndex(le.StreamIndex),
|
| - Values: [][]byte{protobufs[le.StreamIndex]},
|
| - })
|
| - if err != nil {
|
| - panic(fmt.Errorf("failed to Put() LogEntry: %v", err))
|
| + putLogData := func() {
|
| + if !archived {
|
| + // Add the logs to the in-memory temporary storage.
|
| + for _, le := range entries {
|
| + err := is.Put(storage.PutRequest{
|
| + Project: project,
|
| + Path: tls.Path,
|
| + Index: types.MessageIndex(le.StreamIndex),
|
| + Values: [][]byte{protobufs[le.StreamIndex]},
|
| + })
|
| + if err != nil {
|
| + panic(fmt.Errorf("failed to Put() LogEntry: %v", err))
|
| + }
|
| + }
|
| + } else {
|
| + // Archive this log stream. We will generate one index entry for every
|
| + // 2 log entries.
|
| + src := renderer.StaticSource(entries)
|
| + var lbuf, ibuf bytes.Buffer
|
| + m := archive.Manifest{
|
| + Desc: tls.Desc,
|
| + Source: &src,
|
| + LogWriter: &lbuf,
|
| + IndexWriter: &ibuf,
|
| + StreamIndexRange: 2,
|
| + }
|
| + if err := archive.Archive(m); err != nil {
|
| + panic(err)
|
| }
|
| - }
|
| - } else {
|
| - // Archive this log stream. We will generate one index entry for every
|
| - // 2 log entries.
|
| - src := renderer.StaticSource(entries)
|
| - var lbuf, ibuf bytes.Buffer
|
| - m := archive.Manifest{
|
| - Desc: tls.Desc,
|
| - Source: &src,
|
| - LogWriter: &lbuf,
|
| - IndexWriter: &ibuf,
|
| - StreamIndexRange: 2,
|
| - }
|
| - if err := archive.Archive(m); err != nil {
|
| - panic(err)
|
| - }
|
|
|
| - now := env.Clock.Now().UTC()
|
| + now := env.Clock.Now().UTC()
|
|
|
| - env.GSClient.Put("gs://testbucket/stream", lbuf.Bytes())
|
| - env.GSClient.Put("gs://testbucket/index", ibuf.Bytes())
|
| - tls.State.TerminatedTime = now
|
| - tls.State.ArchivedTime = now
|
| - tls.State.ArchiveStreamURL = "gs://testbucket/stream"
|
| - tls.State.ArchiveIndexURL = "gs://testbucket/index"
|
| + env.GSClient.Put("gs://testbucket/stream", lbuf.Bytes())
|
| + env.GSClient.Put("gs://testbucket/index", ibuf.Bytes())
|
| + tls.State.TerminatedTime = now
|
| + tls.State.ArchivedTime = now
|
| + tls.State.ArchiveStreamURL = "gs://testbucket/stream"
|
| + tls.State.ArchiveIndexURL = "gs://testbucket/index"
|
|
|
| - So(tls.State.ArchivalState().Archived(), ShouldBeTrue)
|
| + So(tls.State.ArchivalState().Archived(), ShouldBeTrue)
|
| + }
|
| }
|
| + putLogData()
|
| putLogStream(c)
|
|
|
| Convey(`Testing Get requests`, func() {
|
| @@ -365,6 +377,16 @@ func testGetImpl(t *testing.T, archived bool) {
|
| resp, err := svr.Get(c, &req)
|
| So(err, ShouldBeRPCOK)
|
| So(resp, shouldHaveLogs, 0, 1, 2)
|
| +
|
| + Convey(`Will successfully retrieve the stream path again (caching).`, func() {
|
| + // Re-put the log data, since the previous request closed the
|
| + // storage instance.
|
| + putLogData()
|
| +
|
| + resp, err := svr.Get(c, &req)
|
| + So(err, ShouldBeRPCOK)
|
| + So(resp, shouldHaveLogs, 0, 1, 2)
|
| + })
|
| })
|
|
|
| Convey(`Will successfully retrieve a stream path offset at 4.`, func() {
|
| @@ -469,7 +491,7 @@ func testGetImpl(t *testing.T, archived bool) {
|
| } else {
|
| // Add corrupted entry to Storage. Create a new entry here, since
|
| // the storage will reject a duplicate/overwrite.
|
| - err := env.IntermediateStorage.Put(storage.PutRequest{
|
| + err := is.Put(storage.PutRequest{
|
| Project: project,
|
| Path: types.StreamPath(req.Path),
|
| Index: 666,
|
| @@ -498,7 +520,7 @@ func testGetImpl(t *testing.T, archived bool) {
|
| if archived {
|
| env.GSClient["error"] = []byte("test error")
|
| } else {
|
| - env.IntermediateStorage.Close()
|
| + is.SetErr(errors.New("not working"))
|
| }
|
|
|
| _, err := svr.Get(c, &req)
|
| @@ -592,6 +614,26 @@ func testGetImpl(t *testing.T, archived bool) {
|
| So(err, ShouldBeRPCOK)
|
| So(resp, shouldHaveLogs, 7)
|
| So(resp.State, ShouldResemble, buildLogStreamState(tls.Stream, tls.State))
|
| +
|
| + // For non-archival: 1 miss and 1 put, for the tail row.
|
| + // For archival: 1 miss and 1 put, for the index.
|
| + So(env.StorageCache.Stats(), ShouldResemble, ct.StorageCacheStats{Puts: 1, Misses: 1})
|
| +
|
| + Convey(`Will retrieve the stream path again (caching).`, func() {
|
| + // Re-put the log data, since the previous request closed the
|
| + // storage instance.
|
| + putLogData()
|
| + env.StorageCache.Clear()
|
| +
|
| + resp, err := svr.Tail(c, &req)
|
| + So(err, ShouldBeRPCOK)
|
| + So(resp, shouldHaveLogs, 7)
|
| + So(resp.State, ShouldResemble, buildLogStreamState(tls.Stream, tls.State))
|
| +
|
| + // For non-archival: 1 hit, for the tail row.
|
| + // For archival: 1 hit, for the index.
|
| + So(env.StorageCache.Stats(), ShouldResemble, ct.StorageCacheStats{Hits: 1})
|
| + })
|
| })
|
| })
|
| })
|
| @@ -607,5 +649,5 @@ func TestGetIntermediate(t *testing.T) {
|
| func TestGetArchived(t *testing.T) {
|
| t.Parallel()
|
|
|
| - testGetImpl(t, false)
|
| + testGetImpl(t, true)
|
| }
|
|
|